自頂向下redis4.0(5)持久化

redis4.0的持久化

簡介

雖然redis是記憶體資料庫,但它也提供了持久化的功能。其中rdb持久化可以定時備份用於回滾,而aof持久化則更接近資料庫最新的狀態,伺服器重啟後可以恢復至最新的狀態。兩者數據備份的粒度不同,rdb將整個資料庫備份,aof持久化粒度更為小,但生成的文件更大。如果有多個執行緒同時向磁碟寫入,那麼會增大磁碟的壓力,最終導致執行緒阻塞,因此redis在同一時間只允許一個持久化向磁碟寫入數據。redis默認配置關閉aof持久化,開啟rdb後台持久化。由於aof持久化數據較新,所以如果開啟了aof持久化,redis啟動時會選擇載入aof文件中的數據。

# 默認關閉aof
appendonly no
#   after 900 sec (15 min) if at least 1 key changed
#   after 300 sec (5 min) if at least 10 keys changed
#   after 60 sec if at least 10000 keys changed
save 900 1
save 300 10
save 60 10000

正文

rdb持久化

redis允許save命令和bgsave命令,還支援配置定期保存rdb數據。

save命令

save命令使用saveCommand函數直接調用rdbSave函數在主執行緒保存數據,線上模式不建議使用。在進一步介紹之前,我們先看一眼相關的成員。

struct redisServer {
    /* RDB persistence */
    pid_t rdb_child_pid;            /* PID of RDB saving child */
    char *rdb_filename;             /* Name of RDB file */
    long long dirty;                /* Changes to DB from the last rdb save */
    time_t lastsave;                /* Unix time of last successful save */
    int lastbgsave_status;          /* C_OK or C_ERR */
}

如果已經有rdb子進程在運行,則會直接返回。如果沒有運行的子進程,則將數據存儲到server.rdb_filename文件中,默認為dump.rdbrdbSave函數會打開一個臨時文件,向其寫入數據後,刷新數據到磁碟,然後重命名這個臨時文件為dump.rdb。然後重置server.dirty0,設置lastsave時間。

void saveCommand(client *c) {
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
        return;
    }

    if (rdbSave(server.rdb_filename,null) == C_OK) {
        addReply(c,shared.ok);
    } 
}

具體寫入數據的操作位於rdbSaveRio,它會先寫入rdb的版本,再寫入一些輔助資訊,然後將每個db中的數據寫入,最後寫入校驗碼。

bgsave命令

bgsave命令會調用fork函數開啟子進程,在子進程中調用rdbSave函數。

save命令相同,如果有正在運行的子進程在存儲數據,則會返回錯誤提示。但如果使用bgsave schedule命令並且當前的子進程為aof,則可以延遲調用bgsave命令。

struct redisServer {
    ...
    /* RDB persistence */
    pid_t rdb_child_pid;            /* PID of RDB saving child */
    int child_info_pipe[2];         /* Pipe used to write the child_info_data. */
    struct {
        int process_type;           /* AOF or RDB child? */
        size_t cow_size;            /* Copy on write size. */
        unsigned long long magic;   /* Magic value to make sure data is valid. */
    } child_info_data;
    ...
};

後台啟動rdb就是調用fork函數創建一個子進程,在子進程中調用rdbSave函數。在調用fork函數之前,redis會先創建一個管道用於子進程向父進程的單向通訊,fork後的子進程會和父進程共享文件描述符,所以可以通過管道文件描述符單向通訊。在子進程存儲db數據的時候,會修改記憶體空間,造成copy-on-write,佔用額外的記憶體空間,數據存儲完成後,子進程會向父進程發送額外創建的記憶體大小。

fork(2)
*  The child inherits copies of the parent's set of open file
 descriptors.  Each file descriptor in the child refers to the same
 open file description (see open(2)) as the corresponding file
 descriptor in the parent.  This means that the two file
 descriptors share open file status flags, file offset, and signal-
 driven I/O attributes (see the description of F_SETOWN and
 F_SETSIG in fcntl(2)).
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    openChildInfoPipe(); // 創建管道

    start = ustime();
    if ((childpid = fork()) == 0) {
        //子進程
        int retval;

        closeListeningSockets(0); //因為會繼承文件描述符,所以此處關閉套接字連接
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);
            
            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        //父進程
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

父進程此時記錄子進程id rdb_child_pid和類型。然後在之前註冊的時間事件serverCron中檢查子進程是否結束。wait3等待子進程的狀態發送改變,可能是運行結束了,也可能是被訊號量暫停或者恢復了。如果子進程已經結束則接受子進程通過管道發送的資訊,也就是Copy-On-Write的大小。然後關閉管道。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    //如果有子進程在全量存儲數據
    if (server.rdb_child_pid != -1|| server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
 			
            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } 
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    }
}

由於我們此處是RDB存儲(與之相對的是AOF重寫,但如果開啟RDB格式存儲,兩者幾乎等價),backgroundSaveDoneHandler會調用backgroundSaveDoneHandlerDisk函數。這裡會將rdb_child_pid等數據重置,如果保存成功,則更新server.dirty以及lastsave

void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        serverLog(LL_NOTICE,
            "Background saving terminated with success");
        server.dirty = server.dirty - server.dirty_before_bgsave;
        server.lastsave = time(NULL);
        server.lastbgsave_status = C_OK;
    } else if (!bysignal && exitcode != 0) {
        serverLog(LL_WARNING, "Background saving error");
        server.lastbgsave_status = C_ERR;
    } else {
        mstime_t latency;

        serverLog(LL_WARNING,
            "Background saving terminated by signal %d", bysignal);
        latencyStartMonitor(latency);
        rdbRemoveTempFile(server.rdb_child_pid);
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.lastbgsave_status = C_ERR;
    }
    server.rdb_child_pid = -1;
    server.rdb_child_type = RDB_CHILD_TYPE_NONE;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;
}

rdb定期保存數據

redis默認添加3個定期保存參數,如果使用redis.conf,則會清空默認配置使用redis.conf配置。如果redis.conf中沒有配置,則不會使用rdb定期保存。

appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */

同樣是在serverCron函數中,如果當前沒有aof或者rdb子進程存儲數據,則會檢測條件是否滿足。如果(距離上一次寫入的時間和數據變更的數量滿足條件)並且(上一次寫入成功或者距離上一次寫入已經超過5秒鐘,默認的CONFIG_BGSAVE_RETRY_DELAY值) ,則啟動rdb序列化。

    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        ...
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now. */
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY || // 值為5
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
         }

         /* Trigger an AOF rewrite if needed. */
			...
    }

進程結束保存數據

redis正常關閉的情況下(接受客戶端shutdown命令或者是收到terminal訊號),會調用prepareForShutdown函數。該函數會關閉正在存儲的子進程。如果有配置定期存儲rdb或者是關閉時有傳入save參數,則會在主執行緒中調用rdbSave存儲數據等,接著關閉進程。

可以看到在使用rdb保存數據之前,如果開啟了AOF,那麼redis會調用flushAppendOnlyFile強制將數據寫入磁碟,並調用aof_fsync保證數據刷新。

int prepareForShutdown(int flags) {
    int save = flags & SHUTDOWN_SAVE;
    int nosave = flags & SHUTDOWN_NOSAVE;

    serverLog(LL_WARNING,"User requested shutdown...");

    /* Kill all the Lua debugger forked sessions. */
    ldbKillForkedSessions();

    /* Kill the saving child if there is a background saving in progress.
       We want to avoid race conditions, for instance our saving child may
       overwrite the synchronous saving did by SHUTDOWN. */
    if (server.rdb_child_pid != -1) {
        serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
        kill(server.rdb_child_pid,SIGUSR1);
        rdbRemoveTempFile(server.rdb_child_pid);
    }

    if (server.aof_state != AOF_OFF) {
        /* Kill the AOF saving child as the AOF we already have may be longer
         * but contains the full dataset anyway. */
        if (server.aof_child_pid != -1) {
            /* If we have AOF enabled but haven't written the AOF yet, don't
             * shutdown or else the dataset will be lost. */
            if (server.aof_state == AOF_WAIT_REWRITE) {
                serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
                return C_ERR;
            }
            serverLog(LL_WARNING,
                "There is a child rewriting the AOF. Killing it!");
            kill(server.aof_child_pid,SIGUSR1);
        }
        /* Append only file: flush buffers and fsync() the AOF at exit */
        serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
        flushAppendOnlyFile(1);
        aof_fsync(server.aof_fd);
    }

    /* Create a new RDB file before exiting. */
    if ((server.saveparamslen > 0 && !nosave) || save) {
        serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
        /* Snapshotting. Perform a SYNC SAVE and exit */
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
            /* Ooops.. error saving! The best we can do is to continue
             * operating. Note that if there was a background saving process,
             * in the next cron() Redis will be notified that the background
             * saving aborted, handling special stuff like slaves pending for
             * synchronization... */
            serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
            return C_ERR;
        }
    }

    /* Remove the pid file if possible and needed. */
    if (server.daemonize || server.pidfile) {
        serverLog(LL_NOTICE,"Removing the pid file.");
        unlink(server.pidfile);
    }

    /* Best effort flush of slave output buffers, so that we hopefully
     * send them pending writes. */
    flushSlavesOutputBuffers();

    /* Close the listening sockets. Apparently this allows faster restarts. */
    closeListeningSockets(1);
    serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
        server.sentinel_mode ? "Sentinel" : "Redis");
    return C_OK;
}

aof持久化

數據緩衝區

上文已經提到,redis在解析客戶端請求到client-argcclient-argv後會調用processCommand檢查請求命令的條件是否滿足,如果滿足,則會調用call(client, CMD_CALL_FULL)

/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
#define CMD_CALL_SLOWLOG (1<<0)
#define CMD_CALL_STATS (1<<1)
#define CMD_CALL_PROPAGATE_AOF (1<<2)
#define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)

在這裡,我們觀察一下CMD_CALL_FULL,此時我們只需要知道,該值包含CMD_CALL_PROPAGATE。在調用完命令後,redis會根據情況將命令追加到server->aof_buf中,如果數據有發生改動,命令沒有禁止propagate,並且redis開啟了aof,則會將命令追加到緩衝區。

call(client *c, int flags) {
  	c->cmd->proc(c); //已經執行命令

	/* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE && // flag 就是 CMD_CALL_FULL
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;

        //如果指令有造成數據變化
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        //有些命令強制propagete, 比如publishMessage
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        //有些命令禁止在此處propagate,比如spop,會在其他函數操作
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
            propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
            propagate_flags &= ~PROPAGATE_AOF;

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. Note that modules commands handle replication
         * in an explicit way, so we never replicate them automatically. */
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
  
}

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

在追加命令之前,redis還會做一些處理,如果命令對應的db和上次追加命令的db不同,則插入select命令 。如果是expire系列的命令,則全部切換成pexpireat命令。如果是setex命令,則拆分成setpexpireat。如果此時沒有子進程在重寫,則寫入到緩衝區,如果有子進程在重寫,則嘗試將數據發送給子進程。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appended. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {

        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

刷新數據到磁碟

appendonly no #關閉aof


# 開啟aof後生效
# appendfsync always  #aof 磁碟刷新策略
appendfsync everysec
# appendfsync no

redis默認關閉aof,如果關閉aofserver->aof_buf不會包含任何數據,只有開啟了aof,也就是appendonly yes,才會往aof中寫入數據。

在配置appendonly yes之後,appendfsync配置才會生效,redis默認配置為everysec,也就是每秒嘗試後台執行緒刷新數據到磁碟,但寫入數據還是主執行緒寫入的,只要有數據且沒有子執行緒在寫入數據,就會寫入數據。

redis刷新磁碟的操作也放在beforeSleep中處理。如果讀者看過該系列之前的文章,應該記得redis返回客戶端數據並不是直接發送給客戶端,而是先將數據保存在client->buf中,然後在下一輪的aeMainLoop前的beforeSleep函數中調用handleClientsWithPendingWrites, 將數據返回給客戶端。這樣做的目的是為了兼容appendfysync always的效果。所以在beforeSleep函數中,刷新函數flushAppendOnlyFile位於handleClientsWithPendingWrites之前。

void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
}

刷新數據也有3種策略,下文會按照noalwayseverysec的順序結合源碼講解。

appendfsync no

在不保證刷新的策略下,redis也會調用flushAppendOnly函數就等於直接調用aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));將數據寫入系統緩衝區,但文件是否刷新到磁碟,以及什麼時候刷新由系統決定。由於調用aofWrite可能會遇到磁碟空間不夠的問題,redis會對比傳入的數據長度和寫入的數據長度,如果沒有全部寫入,為了保證下一次載入aof文件能夠順利,reids會裁剪掉部分寫入的數據,等待下次重新寫入。如果裁剪失敗,則縮減aof_buf的長度,刪除aof_buf中已經寫入的部分,下次從最新的地方開始寫入。並且如果寫入系統緩衝區發送問題,則會在處理完問題後返回,而不會調用aof_sync等刷新磁碟的函數。

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) return;

    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;

        //有寫入數據
        if (nwritten != -1) {
            //將剛才寫入的數據裁剪掉
            
            //todo what will happen if system ftruncate the file some part is still in the memory not yet flushed to the disk
            if (ftruncate(server.aof_fd, server.aof_current_size) != -1) {
                //裁剪成功
                nwritten = -1;
            } 
            server.aof_last_write_errno = ENOSPC;
        }

        server.aof_last_write_status = C_ERR;
        //如果裁剪失敗
        if (nwritten > 0) {
            server.aof_current_size += nwritten;
            sdsrange(server.aof_buf,nwritten,-1);
        }
        return; /* We'll try again on the next call... */
        
    }
    
    server.aof_current_size += nwritten;

    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    //下面是刷新磁碟的操作
}
appendfysnc always

always模式保證客戶端接受返回數據後,redis一定已經將數據變化刷新回磁碟。採用該模式相當於redis在主執行緒中調用完aofWrite函數後,緊接著調用了aof_sync函數,也就是fsync系列的函數。該模式迫使redis在主執行緒訪問磁碟,會導致性能極具下降。並且always的容錯性較差,如果aofWrite沒有將aof_buf中的全部數據寫入,redis會立刻退出。

磁碟 記憶體 訪問時間

appendfysnc everysec

每秒刷新一次數據到磁碟是redis的默認配置,它會嘗試每秒刷新文件到磁碟。由於flushAppendOnlyFileserverCron中被調用,而serverCron的頻率為10次/秒,所以redis默認寫入數據的頻率和刷新數據的頻率為10:1。如果開啟了aof_no_fsync_on_rewrite,則不會在有子進程全量存儲的時候(包括rdb存儲和aof重寫)同步增量aof數據。

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) return;

    // 查看是否有子執行緒在同步數據
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        if (sync_in_progress) {
            
            //如果有另外的執行緒在寫入數據,則等待一個postponed的循環和2秒
            if (server.aof_flush_postponed_start == 0) {
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                return;
            }
            //如果還沒有處理完,則繼續寫入,實際上會阻塞
        }
    }

    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    server.aof_flush_postponed_start = 0;

    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
    	//上文已經介紹,如果寫入的數據不全,則返回
        ...
        return; /* We'll try again on the next call... */
    } 

    //此時數據已寫入系統緩衝區,刷新`aof_buf`的緩衝區
    sdsfree(server.aof_buf);
    server.aof_buf = sdsempty();

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

redis在將數據寫入磁碟時,會在主執行緒調用write函數,然後在另外的執行緒中調用fsync函數。這樣能夠讓另外一個執行緒阻塞在IO上而不影響主執行緒的操作,但需要注意的是如果另一個執行緒fsync函數如果沒有返回,主執行緒就調用write函數,那麼主執行緒也會阻塞在write函數上。[4]

《Redis開發與運維》[3]中提到

通過對AOF阻塞流程可以發現兩個問題:

1) everysec配置最多可能丟失2秒數據, 不是1秒

2) 如果系統fsync緩慢, 將會導致Redis主執行緒阻塞影響效率。

實際上在redis4.0版本中,everysec配置最多可能丟失2秒加上一個aeMainLoop循環的時間。雖然《Redis開發與運維》指出了兩個問題,但實際上它們是同一個問題,那就是磁碟寫入速度無法承受過量的數據。在使用everysec配置時,如果發生這個問題,redis首先考慮主執行緒的運行,如果距離上一次延遲寫入的時間戳aof_flush_postponed_start小於2秒,那麼先跳過這一次的寫入,避免阻塞以保證主執行緒能夠處理請求。如果2秒後數據還沒有從緩衝區刷新到磁碟,那麼將會調用aofWrite導致主執行緒阻塞。

aof重寫

aof重寫的配置

aof重寫可以輸入指令觸發bgrewriteaof,也可以配置條件觸發重寫。

auto-aof-rewrite-min-size 64mb
auto-aof-rewrite-percentage 100

僅僅這兩個配置還不能了解清楚redis何時重寫,我們還需要有aof_current_sizeaof_base_sizeaof_current_size就是aof文件當前的大小,redis啟動載入aof文件或者每次aof追加數據都會更新這個值,這個值並不會存儲到磁碟中,aof_base_size也是同理,如果啟動時有載入aof文件,那麼aof_base_size的值就是aof文件的大小。

aof_current_size>auto-aof-rewrite-min-size並且有配置auto-aof-rewrite-percentage時,如果(aof_current_sizeaof_base_size)/100 >= percentage,則會自動重寫。比如按照上文的配置,redis啟動時載入的aof文件大小為100mb,那麼aof_base_size就是100mb,當redis文件增長到200mb的時候就會自動重寫。

但是會存在這樣一種情況,redis文件增長到199mb的時候,剛好重啟了,那麼下次啟動的時候,aof_base_size就和aof_current_size大小相等,想要觸發自動重寫,就要等到redis文件大小增長到400mb左右。如果數據增長地比較緩慢,或者是百分比配置較大。在觸發重寫之前,redis就關閉或者重啟了。那麼aof_base_size下次啟動的時候會被刷新成aof_current_size的大小,導致可能永遠無法觸發自動重寫。

aof重寫的優先順序

aof重寫的優先順序低於rdb,如果兩者的觸發條件同時滿足,redis會優先處理rdb存儲。觀察源程式碼,可以發現rdb存儲先於aof,如果rdb此處觸發,即使aof觸發重寫的條件滿足,因為server.rdb_child_pid將不為-1,導致無法進入aof重寫。

serverCron(aeEventLoop*, longlong, void*) {
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren()) {
            //... 檢查子進程是否結束並處理。
        } else {
            /* If there is not a background saving/rewrite in progress check if
             * we have to save/rewrite now. */
             for (j = 0; j < server.saveparamslen; j++) {
                ...
                //..處理rdb自動存儲
             }

             /* Trigger an AOF rewrite if needed. */
             if (server.aof_state == AOF_ON &&
                 server.rdb_child_pid == -1 &&
                 server.aof_child_pid == -1 &&
                 server.aof_rewrite_perc &&
                 server.aof_current_size > server.aof_rewrite_min_size)
             {
                long long base = server.aof_rewrite_base_size ?
                                server.aof_rewrite_base_size : 1;
                long long growth = (server.aof_current_size*100/base) - 100;
                if (growth >= server.aof_rewrite_perc) {
                    serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                    rewriteAppendOnlyFileBackground();
                }
             }
        }
}

aof 重寫的來龍去脈

rewriteAppendOnlyFileBackground會創建許多管道用於父子間通訊。

  • childInfoPipe用於子進程向父進程提示有多少個Copy-On-Write記憶體。
  • aof_pipe_write_data_to_child用於父進程向aof重寫子進程發送最近的數據變更。
  • aof_pipe_write_ack_to_parentaof_pipe_write_ack_to_child用於等待彼此的確認消息。

並且註冊了aof_pipe_read_ack_from_child的文件事件,當子進程向父進程發送中止請求的時候,就會調用aof_pipe_read_ack_from_child函數。

int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    
    //注意:
    //這裡註冊了一個文件事件
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

父進程 創建完子進程後,父進程會更新aof_child_pid記錄子進程id,雖然只更新了一個欄位,但意味著已經開啟了一個很有可能影響redis性能的任務。


子進程 先向臨時文件寫入當前資料庫的內容,如果開啟了aof_use_rdb_preamble(默認關閉,但建議開啟),那麼就會寫入rdb數據,也就是db數據全量存儲,否則按aof追加模式,全量存儲db中的內容,接著刷新數據到磁碟,阻塞。

//in function rewriteAppendOnlyFile(char* filename)
if (server.aof_use_rdb_preamble) {
    int error;
    if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
        errno = error;
        goto werr;
    }
} else {
    if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}

/* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;

父進程aof子進程等待數據刷新的時候,繼續處理請求,並且將數據追加到server.aof_rewrite_buf_blocks,如果沒有註冊aof_pipe_write_data_to_child(是個管道,也就是文件描述符)文件事件的話,會將該管道和aofChildWriteDiffData綁定,如果管道可寫,則會將server.aof_rewrite_buf_blocks中的數據寫入管道發送給子進程。這樣保證了父進程不會因為向管道寫入數據而阻塞

/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
       ...
       // 一直將數據寫入aof_rewrite_buf_block
    }

	//註冊文件事件
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

子進程 刷新完之前的數據後,會在1秒內一直讀取來自父進程的數據,將其寫入到aof_child_diff中。然後向父進程發送停發數據請求。

//in function rewriteAppendOnlyFile(char* filename)
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
    if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
    {
        nodata++;
        continue;
    }
    nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
    aofReadDiffFromParent();
}
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;


父進程aeMainLoop中檢測到aof_pipe_read_ack_from_child管道可讀事件(在創建管道的時候註冊,請看前文),調用aofChildPipeReadable函數,將aof_stop_sending_diff設置為1,父進程不會再將aof_rewrite_buf_blocks緩衝區的內容寫給子進程。並向子進程發送消息表示已經收到停發請求。


子進程 接受到父進程的同意後,最後讀取一次數據,因為在父進程接受到停發請求前可能又發送了數據。至此,停發請求前的額外aof增量數據都已寫入aof_child_diff。接著子進程將其寫入文件並刷新,退出子進程。

if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
    byte != '!') goto werr;
aofReadDiffFromParent();
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
    goto werr;

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

父進程serverCron函數中調用wait3檢測到aof重寫子進程的退出,會調用backgroundRewriteDoneHandler處理。
它首先會打開之前保存的臨時文件,將中止請求後的追加數據aof_rewrite_buf_blocks寫入文件(注意:雖然子進程之前請求中止發送數據了,但因為rdb_child_pid直到現在還是保存的子進程的id,會一直接受追加數據到aof_rewrite_buf_blocks)。此時已經將所有的數據都寫入aof臨時文件。接下來就是將臨時文件替換為aof保存的文件名。

rdb對比aof

官網有一篇文章《persistence》已經做了比對,在此不再贅述。

參考文獻

[1]《Redis 源碼》

[2]《Redis開發與運維》

[3]《Redis設計與實現》

[4]《fsync() on a different thread: apparently a useless trick》

[5]《private dirty memory》

[6]《pipe(2) – Linux man page》

[7]《wait3(2) – Linux man page》

[8]《ftruncate(3) – Linux man page》

[9]《Redis persistence》