自頂向下redis4.0(5)持久化
redis4.0的持久化
簡介
雖然redis
是記憶體資料庫,但它也提供了持久化的功能。其中rdb
持久化可以定時備份用於回滾,而aof
持久化則更接近資料庫最新的狀態,伺服器重啟後可以恢復至最新的狀態。兩者數據備份的粒度不同,rdb
將整個資料庫備份,aof
持久化粒度更為小,但生成的文件更大。如果有多個執行緒同時向磁碟寫入,那麼會增大磁碟的壓力,最終導致執行緒阻塞,因此redis
在同一時間只允許一個持久化向磁碟寫入數據。redis
默認配置關閉aof
持久化,開啟rdb
後台持久化。由於aof
持久化數據較新,所以如果開啟了aof
持久化,redis
啟動時會選擇載入aof
文件中的數據。
# 默認關閉aof
appendonly no
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
save 900 1
save 300 10
save 60 10000
正文
rdb持久化
redis
允許save
命令和bgsave
命令,還支援配置定期保存rdb
數據。
save命令
save
命令使用saveCommand
函數直接調用rdbSave
函數在主執行緒保存數據,線上模式不建議使用。在進一步介紹之前,我們先看一眼相關的成員。
struct redisServer {
/* RDB persistence */
pid_t rdb_child_pid; /* PID of RDB saving child */
char *rdb_filename; /* Name of RDB file */
long long dirty; /* Changes to DB from the last rdb save */
time_t lastsave; /* Unix time of last successful save */
int lastbgsave_status; /* C_OK or C_ERR */
}
如果已經有rdb
子進程在運行,則會直接返回。如果沒有運行的子進程,則將數據存儲到server.rdb_filename
文件中,默認為dump.rdb
。rdbSave
函數會打開一個臨時文件,向其寫入數據後,刷新數據到磁碟,然後重命名這個臨時文件為dump.rdb
。然後重置server.dirty
為0
,設置lastsave
時間。
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSave(server.rdb_filename,null) == C_OK) {
addReply(c,shared.ok);
}
}
具體寫入數據的操作位於rdbSaveRio
,它會先寫入rdb
的版本,再寫入一些輔助資訊,然後將每個db
中的數據寫入,最後寫入校驗碼。
bgsave命令
bgsave
命令會調用fork
函數開啟子進程,在子進程中調用rdbSave
函數。
和save
命令相同,如果有正在運行的子進程在存儲數據,則會返回錯誤提示。但如果使用bgsave schedule
命令並且當前的子進程為aof
,則可以延遲調用bgsave
命令。
struct redisServer {
...
/* RDB persistence */
pid_t rdb_child_pid; /* PID of RDB saving child */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
int process_type; /* AOF or RDB child? */
size_t cow_size; /* Copy on write size. */
unsigned long long magic; /* Magic value to make sure data is valid. */
} child_info_data;
...
};
後台啟動rdb
就是調用fork
函數創建一個子進程,在子進程中調用rdbSave
函數。在調用fork
函數之前,redis
會先創建一個管道用於子進程向父進程的單向通訊,fork
後的子進程會和父進程共享文件描述符,所以可以通過管道文件描述符單向通訊。在子進程存儲db
數據的時候,會修改記憶體空間,造成copy-on-write
,佔用額外的記憶體空間,數據存儲完成後,子進程會向父進程發送額外創建的記憶體大小。
fork(2) * The child inherits copies of the parent's set of open file descriptors. Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent. This means that the two file descriptors share open file status flags, file offset, and signal- driven I/O attributes (see the description of F_SETOWN and F_SETSIG in fcntl(2)).
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
openChildInfoPipe(); // 創建管道
start = ustime();
if ((childpid = fork()) == 0) {
//子進程
int retval;
closeListeningSockets(0); //因為會繼承文件描述符,所以此處關閉套接字連接
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
//父進程
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}
父進程此時記錄子進程id rdb_child_pid
和類型。然後在之前註冊的時間事件serverCron
中檢查子進程是否結束。wait3
等待子進程的狀態發送改變,可能是運行結束了,也可能是被訊號量暫停或者恢復了。如果子進程已經結束則接受子進程通過管道發送的資訊,也就是Copy-On-Write
的大小。然後關閉管道。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
//如果有子進程在全量存儲數據
if (server.rdb_child_pid != -1|| server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
}
updateDictResizePolicy();
closeChildInfoPipe();
}
}
}
由於我們此處是RDB
存儲(與之相對的是AOF
重寫,但如果開啟RDB
格式存儲,兩者幾乎等價),backgroundSaveDoneHandler
會調用backgroundSaveDoneHandlerDisk
函數。這裡會將rdb_child_pid
等數據重置,如果保存成功,則更新server.dirty
以及lastsave
。
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background saving error");
server.lastbgsave_status = C_ERR;
} else {
mstime_t latency;
serverLog(LL_WARNING,
"Background saving terminated by signal %d", bysignal);
latencyStartMonitor(latency);
rdbRemoveTempFile(server.rdb_child_pid);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = C_ERR;
}
server.rdb_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
}
rdb定期保存數據
redis
默認添加3個定期保存參數,如果使用redis.conf
,則會清空默認配置使用redis.conf
配置。如果redis.conf
中沒有配置,則不會使用rdb
定期保存。
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
同樣是在serverCron
函數中,如果當前沒有aof
或者rdb
子進程存儲數據,則會檢測條件是否滿足。如果(距離上一次寫入的時間和數據變更的數量滿足條件)並且(上一次寫入成功或者距離上一次寫入已經超過5秒鐘,默認的CONFIG_BGSAVE_RETRY_DELAY
值) ,則啟動rdb
序列化。
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
...
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY || // 值為5
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
...
}
進程結束保存數據
在redis
正常關閉的情況下(接受客戶端shutdown
命令或者是收到terminal
訊號),會調用prepareForShutdown
函數。該函數會關閉正在存儲的子進程。如果有配置定期存儲rdb
或者是關閉時有傳入save
參數,則會在主執行緒中調用rdbSave
存儲數據等,接著關閉進程。
可以看到在使用rdb
保存數據之前,如果開啟了AOF
,那麼redis
會調用flushAppendOnlyFile
強制將數據寫入磁碟,並調用aof_fsync
保證數據刷新。
int prepareForShutdown(int flags) {
int save = flags & SHUTDOWN_SAVE;
int nosave = flags & SHUTDOWN_NOSAVE;
serverLog(LL_WARNING,"User requested shutdown...");
/* Kill all the Lua debugger forked sessions. */
ldbKillForkedSessions();
/* Kill the saving child if there is a background saving in progress.
We want to avoid race conditions, for instance our saving child may
overwrite the synchronous saving did by SHUTDOWN. */
if (server.rdb_child_pid != -1) {
serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (server.aof_state != AOF_OFF) {
/* Kill the AOF saving child as the AOF we already have may be longer
* but contains the full dataset anyway. */
if (server.aof_child_pid != -1) {
/* If we have AOF enabled but haven't written the AOF yet, don't
* shutdown or else the dataset will be lost. */
if (server.aof_state == AOF_WAIT_REWRITE) {
serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
return C_ERR;
}
serverLog(LL_WARNING,
"There is a child rewriting the AOF. Killing it!");
kill(server.aof_child_pid,SIGUSR1);
}
/* Append only file: flush buffers and fsync() the AOF at exit */
serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
flushAppendOnlyFile(1);
aof_fsync(server.aof_fd);
}
/* Create a new RDB file before exiting. */
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
* saving aborted, handling special stuff like slaves pending for
* synchronization... */
serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
return C_ERR;
}
}
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
unlink(server.pidfile);
}
/* Best effort flush of slave output buffers, so that we hopefully
* send them pending writes. */
flushSlavesOutputBuffers();
/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
server.sentinel_mode ? "Sentinel" : "Redis");
return C_OK;
}
aof持久化
數據緩衝區
上文已經提到,redis
在解析客戶端請求到client-argc
和client-argv
後會調用processCommand
檢查請求命令的條件是否滿足,如果滿足,則會調用call(client, CMD_CALL_FULL)
。
/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
#define CMD_CALL_SLOWLOG (1<<0)
#define CMD_CALL_STATS (1<<1)
#define CMD_CALL_PROPAGATE_AOF (1<<2)
#define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
在這裡,我們觀察一下CMD_CALL_FULL
,此時我們只需要知道,該值包含CMD_CALL_PROPAGATE
。在調用完命令後,redis
會根據情況將命令追加到server->aof_buf
中,如果數據有發生改動,命令沒有禁止propagate,並且redis
開啟了aof
,則會將命令追加到緩衝區。
call(client *c, int flags) {
c->cmd->proc(c); //已經執行命令
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE && // flag 就是 CMD_CALL_FULL
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
//如果指令有造成數據變化
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
//有些命令強制propagete, 比如publishMessage
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
//有些命令禁止在此處propagate,比如spop,會在其他函數操作
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
在追加命令之前,redis
還會做一些處理,如果命令對應的db
和上次追加命令的db
不同,則插入select
命令 。如果是expire
系列的命令,則全部切換成pexpireat
命令。如果是setex
命令,則拆分成set
和pexpireat
。如果此時沒有子進程在重寫,則寫入到緩衝區,如果有子進程在重寫,則嘗試將數據發送給子進程。
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
刷新數據到磁碟
appendonly no #關閉aof
# 開啟aof後生效
# appendfsync always #aof 磁碟刷新策略
appendfsync everysec
# appendfsync no
redis
默認關閉aof
,如果關閉aof
則server->aof_buf
不會包含任何數據,只有開啟了aof
,也就是appendonly yes
,才會往aof
中寫入數據。
在配置appendonly yes
之後,appendfsync
配置才會生效,redis
默認配置為everysec
,也就是每秒嘗試後台執行緒刷新數據到磁碟,但寫入數據還是主執行緒寫入的,只要有數據且沒有子執行緒在寫入數據,就會寫入數據。
redis
刷新磁碟的操作也放在beforeSleep
中處理。如果讀者看過該系列之前的文章,應該記得redis
返回客戶端數據並不是直接發送給客戶端,而是先將數據保存在client->buf
中,然後在下一輪的aeMainLoop
前的beforeSleep
函數中調用handleClientsWithPendingWrites
, 將數據返回給客戶端。這樣做的目的是為了兼容appendfysync always
的效果。所以在beforeSleep
函數中,刷新函數flushAppendOnlyFile
位於handleClientsWithPendingWrites
之前。
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
}
刷新數據也有3種策略,下文會按照no
,always
,everysec
的順序結合源碼講解。
appendfsync no
在不保證刷新的策略下,redis
也會調用flushAppendOnly
函數就等於直接調用aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
將數據寫入系統緩衝區,但文件是否刷新到磁碟,以及什麼時候刷新由系統決定。由於調用aofWrite
可能會遇到磁碟空間不夠的問題,redis
會對比傳入的數據長度和寫入的數據長度,如果沒有全部寫入,為了保證下一次載入aof
文件能夠順利,reids
會裁剪掉部分寫入的數據,等待下次重新寫入。如果裁剪失敗,則縮減aof_buf
的長度,刪除aof_buf
中已經寫入的部分,下次從最新的地方開始寫入。並且如果寫入系統緩衝區發送問題,則會在處理完問題後返回,而不會調用aof_sync
等刷新磁碟的函數。
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) return;
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
//有寫入數據
if (nwritten != -1) {
//將剛才寫入的數據裁剪掉
//todo what will happen if system ftruncate the file some part is still in the memory not yet flushed to the disk
if (ftruncate(server.aof_fd, server.aof_current_size) != -1) {
//裁剪成功
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
server.aof_last_write_status = C_ERR;
//如果裁剪失敗
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
server.aof_current_size += nwritten;
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//下面是刷新磁碟的操作
}
appendfysnc always
always
模式保證客戶端接受返回數據後,redis
一定已經將數據變化刷新回磁碟。採用該模式相當於redis
在主執行緒中調用完aofWrite
函數後,緊接著調用了aof_sync
函數,也就是fsync
系列的函數。該模式迫使redis
在主執行緒訪問磁碟,會導致性能極具下降。並且always
的容錯性較差,如果aofWrite
沒有將aof_buf
中的全部數據寫入,redis
會立刻退出。
appendfysnc everysec
每秒刷新一次數據到磁碟是redis
的默認配置,它會嘗試每秒刷新文件到磁碟。由於flushAppendOnlyFile
在serverCron
中被調用,而serverCron
的頻率為10次/秒,所以redis
默認寫入數據的頻率和刷新數據的頻率為10:1。如果開啟了aof_no_fsync_on_rewrite
,則不會在有子進程全量存儲的時候(包括rdb
存儲和aof
重寫)同步增量aof
數據。
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) return;
// 查看是否有子執行緒在同步數據
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
if (sync_in_progress) {
//如果有另外的執行緒在寫入數據,則等待一個postponed的循環和2秒
if (server.aof_flush_postponed_start == 0) {
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
return;
}
//如果還沒有處理完,則繼續寫入,實際上會阻塞
}
}
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
//上文已經介紹,如果寫入的數據不全,則返回
...
return; /* We'll try again on the next call... */
}
//此時數據已寫入系統緩衝區,刷新`aof_buf`的緩衝區
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
redis
在將數據寫入磁碟時,會在主執行緒調用write
函數,然後在另外的執行緒中調用fsync
函數。這樣能夠讓另外一個執行緒阻塞在IO
上而不影響主執行緒的操作,但需要注意的是如果另一個執行緒fsync
函數如果沒有返回,主執行緒就調用write
函數,那麼主執行緒也會阻塞在write
函數上。[4]
《Redis開發與運維》[3]中提到
通過對AOF阻塞流程可以發現兩個問題:
1) everysec配置最多可能丟失2秒數據, 不是1秒
2) 如果系統fsync緩慢, 將會導致Redis主執行緒阻塞影響效率。
實際上在redis
4.0版本中,everysec
配置最多可能丟失2秒加上一個aeMainLoop
循環的時間。雖然《Redis開發與運維》指出了兩個問題,但實際上它們是同一個問題,那就是磁碟寫入速度無法承受過量的數據。在使用everysec
配置時,如果發生這個問題,redis
首先考慮主執行緒的運行,如果距離上一次延遲寫入的時間戳aof_flush_postponed_start
小於2秒,那麼先跳過這一次的寫入,避免阻塞以保證主執行緒能夠處理請求。如果2秒後數據還沒有從緩衝區刷新到磁碟,那麼將會調用aofWrite
導致主執行緒阻塞。
aof重寫
aof重寫的配置
aof
重寫可以輸入指令觸發bgrewriteaof
,也可以配置條件觸發重寫。
auto-aof-rewrite-min-size 64mb
auto-aof-rewrite-percentage 100
僅僅這兩個配置還不能了解清楚redis
何時重寫,我們還需要有aof_current_size
和aof_base_size
,aof_current_size
就是aof
文件當前的大小,redis
啟動載入aof
文件或者每次aof
追加數據都會更新這個值,這個值並不會存儲到磁碟中,aof_base_size
也是同理,如果啟動時有載入aof
文件,那麼aof_base_size
的值就是aof
文件的大小。
當aof_current_size
>auto-aof-rewrite-min-size
並且有配置auto-aof-rewrite-percentage
時,如果(aof_current_size
–aof_base_size
)/100
>= percentage
,則會自動重寫。比如按照上文的配置,redis
啟動時載入的aof
文件大小為100mb
,那麼aof_base_size
就是100mb
,當redis
文件增長到200mb
的時候就會自動重寫。
但是會存在這樣一種情況,redis
文件增長到199mb
的時候,剛好重啟了,那麼下次啟動的時候,aof_base_size
就和aof_current_size
大小相等,想要觸發自動重寫,就要等到redis
文件大小增長到400mb
左右。如果數據增長地比較緩慢,或者是百分比配置較大。在觸發重寫之前,redis
就關閉或者重啟了。那麼aof_base_size
下次啟動的時候會被刷新成aof_current_size
的大小,導致可能永遠無法觸發自動重寫。
aof重寫的優先順序
aof
重寫的優先順序低於rdb
,如果兩者的觸發條件同時滿足,redis
會優先處理rdb
存儲。觀察源程式碼,可以發現rdb
存儲先於aof
,如果rdb
此處觸發,即使aof
觸發重寫的條件滿足,因為server.rdb_child_pid
將不為-1
,導致無法進入aof
重寫。
serverCron(aeEventLoop*, longlong, void*) {
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) {
//... 檢查子進程是否結束並處理。
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
...
//..處理rdb自動存儲
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
}
aof 重寫的來龍去脈
rewriteAppendOnlyFileBackground
會創建許多管道用於父子間通訊。
childInfoPipe
用於子進程向父進程提示有多少個Copy-On-Write
記憶體。aof_pipe_write_data_to_child
用於父進程向aof
重寫子進程發送最近的數據變更。aof_pipe_write_ack_to_parent
和aof_pipe_write_ack_to_child
用於等待彼此的確認消息。
並且註冊了aof_pipe_read_ack_from_child
的文件事件,當子進程向父進程發送中止請求的時候,就會調用aof_pipe_read_ack_from_child
函數。
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
//注意:
//這裡註冊了一個文件事件
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}
父進程 創建完子進程後,父進程會更新aof_child_pid
記錄子進程id
,雖然只更新了一個欄位,但意味著已經開啟了一個很有可能影響redis
性能的任務。
子進程 先向臨時文件寫入當前資料庫的內容,如果開啟了aof_use_rdb_preamble
(默認關閉,但建議開啟),那麼就會寫入rdb
數據,也就是db
數據全量存儲,否則按aof
追加模式,全量存儲db
中的內容,接著刷新數據到磁碟,阻塞。
//in function rewriteAppendOnlyFile(char* filename)
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
父進程 在aof
子進程等待數據刷新的時候,繼續處理請求,並且將數據追加到server.aof_rewrite_buf_blocks
,如果沒有註冊aof_pipe_write_data_to_child
(是個管道,也就是文件描述符)文件事件的話,會將該管道和aofChildWriteDiffData
綁定,如果管道可寫,則會將server.aof_rewrite_buf_blocks
中的數據寫入管道發送給子進程。這樣保證了父進程不會因為向管道寫入數據而阻塞。
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
...
// 一直將數據寫入aof_rewrite_buf_block
}
//註冊文件事件
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
子進程 刷新完之前的數據後,會在1秒內一直讀取來自父進程的數據,將其寫入到aof_child_diff
中。然後向父進程發送停發數據請求。
//in function rewriteAppendOnlyFile(char* filename)
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
父進程 在aeMainLoop
中檢測到aof_pipe_read_ack_from_child
管道可讀事件(在創建管道的時候註冊,請看前文),調用aofChildPipeReadable
函數,將aof_stop_sending_diff
設置為1,父進程不會再將aof_rewrite_buf_blocks
緩衝區的內容寫給子進程。並向子進程發送消息表示已經收到停發請求。
子進程 接受到父進程的同意後,最後讀取一次數據,因為在父進程接受到停發請求前可能又發送了數據。至此,停發請求前的額外aof
增量數據都已寫入aof_child_diff
。接著子進程將其寫入文件並刷新,退出子進程。
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
aofReadDiffFromParent();
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
父進程 在serverCron
函數中調用wait3
檢測到aof
重寫子進程的退出,會調用backgroundRewriteDoneHandler
處理。
它首先會打開之前保存的臨時文件,將中止請求後的追加數據aof_rewrite_buf_blocks
寫入文件(注意:雖然子進程之前請求中止發送數據了,但因為rdb_child_pid
直到現在還是保存的子進程的id,會一直接受追加數據到aof_rewrite_buf_blocks
)。此時已經將所有的數據都寫入aof
臨時文件。接下來就是將臨時文件替換為aof
保存的文件名。
rdb對比aof
官網有一篇文章《persistence》已經做了比對,在此不再贅述。
參考文獻
[1]《Redis 源碼》
[2]《Redis開發與運維》
[3]《Redis設計與實現》
[4]《fsync() on a different thread: apparently a useless trick》
[7]《wait3(2) – Linux man page》
[8]《ftruncate(3) – Linux man page》