代碼:aof.c
因為RDB不是很可靠,所以有了AOF(append only file),每當 Redis 接受到會修改數據集的命令時,就會把命令追加到 AOF 文件里,當你重啟 Redis 時,AOF 文件里的命令會被重新執行一次,重建數據。
AOF因為是保存的修改的命令,隨著時間推移,文件會越來越大,可以通過AOF重寫來降低文件大小
AOF重寫主要流程為:用戶執行bgrewriteaof命令(也有其他觸發時機,見下文),觸發AOF,fork子進程進行AOF重寫,子進程繼承了父進程的進程空間,遍歷 Redis server 上的所有數據庫,把每個鍵值對以插入操作的形式寫入日志文件,在這期間父進程繼續提供服務,父進程的寫操作會先緩存到 buf 中,之后父進程把 buf 中的數據,通過管道發給子進程,子進程寫完AOF后,從管道中讀取修改命令并寫入,之后發送ack告訴父進程(父進程收到子進程ack后設置server.aof_stop_sending_diff為1,此時父進程不會再發送數據到管道,父進程保存修改命令數據,即累計的修改數據),然后父進程回復ack,子進程收到ack后會再嘗試把管道中的數據讀出來寫入AOF文件,之后重命名AOF文件,退出,父進程感知這一狀態(serverCron -> checkChildrenDone -> backgroundRewriteDoneHandler -> waitpid),讀取子進程重寫的aof文件,在文件末尾再次寫入父進程積累的數據,然后將文件名重命名成最終文件,重寫結束,最后該文件被重新打開作為新的AOF文件,父進程繼續寫入修改命令到該文件中。
AOF文件內容
*2 #當前命令有兩個參數
$6 #第一個參數長度是6字節,即SELECT
SELECT #當前命令的第一個參數值
$1 #當前命令的第二個參數,長度是1,即0
0 #當前命令的第二個參數值
*3
$3
SET
$4
name
$4
jack
*2
$6
SELECT
$1
0
*3
$3
set
$3
sex
$4
male
AOF 重寫的觸發時機
實現 AOF 重寫的函數是 rewriteAppendOnlyFileBackground
rewriteAppendOnlyFileBackground 函數一共會在三個函數中被調用
bgrewriteaofCommand 函數,執行 bgrewriteaof 命令,也就是說,我們手動觸發了 AOF rewrite 的執行
void bgrewriteaofCommand(client *c) {
 if (server.child_type == CHILD_TYPE_AOF) { // 已經在執行了就不再執行
 addReplyError(c,"Background append only file rewriting already in progress");
 } else if (hasActiveChildProcess()) { // 有子進程,即rdb
 server.aof_rewrite_scheduled = 1; // 會在ServerCron中判斷并調用rewriteAppendOnlyFileBackground進行AOF重寫
 addReplyStatus(c,"Background append only file rewriting scheduled");
 } else if (rewriteAppendOnlyFileBackground() == C_OK) { // 進行AOF重寫
 addReplyStatus(c,"Background append only file rewriting started");
 } else {
 addReplyError(c,"Can't execute an AOF background rewriting. "
 "Please check the server logs for more information.");
 }
}即只有當前既沒有 AOF 重寫子進程也沒有 RDB 子進程,bgrewriteaofCommand 函數才會立即調用 rewriteAppendOnlyFileBackground 函數,實際執行 AOF 重寫。
startAppendOnly函數,該函數會被 configSetCommand 函數(在config.c文件中,在 Redis 中執行 config 命令啟用 AOF 功能會調用,即執行"config set appendonly yes"命令)和 restartAOFAfterSYNC 函數(在replication.c文件中,會在主從節點的復制過程中被調用,就是當主從節點在進行復制時,如果從節點的 AOF 選項被打開,那么在加載解析 RDB 文件時,AOF 選項就會被關閉。然后,無論從節點是否成功加載了 RDB 文件,restartAOFAfterSYNC 函數都會被調用,用來恢復被關閉的 AOF 功能)調用
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly)
configSetCommand調用流程:configSetCommand -> interface.set -> boolConfigSet -> data.yesno.update_fn -> updateAppendonly ->startAppendOnly
serverCron 函數
serverCron 函數會檢測當前是否沒有 RDB 子進程和 AOF 重寫子進程在執行,并檢測是否有 AOF 重寫操作被設置為了待調度執行,也就aof_rewrite_scheduled 變量值為 1。如果這三個條件都滿足,那么 serverCron 函數就會調用rewriteAppendOnlyFileBackground 函數來執行 AOF 重寫
判斷AOF 功能已啟用、AOF 文件大小比例超出閾值,以及 AOF 文件大小絕對值超出閾值,都滿足則進行AOF重寫
總結來說下面幾點會進行重寫:
時機一:bgrewriteaof 命令被執行。
時機二:主從復制完成 RDB 文件解析和加載(無論是否成功)。
時機三:AOF 重寫被設置為待調度執行。
時機四:AOF 被啟用,同時 AOF 文件的大小比例超出閾值,以及 AOF 文件的大小絕對值超出閾值。
重寫過程
在rewriteAppendOnlyFileBackground函數中實現的
/* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */
/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    if (hasActiveChildProcess()) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];
        /* Child */
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        if (childpid == -1) {
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %ld",(long) childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}
管道通信
在子進程AOF期間,父進程繼續提供服務,產生的修改命令需要傳輸給子進程,子進程將這部分修改命令寫入到AOF文件中,通信方式是管道信息
aofCreatePipes 函數創建了包含 6 個文件描述符元素的數組 fds
/* Create the pipes used for parent - child process IPC during rewrite.
 * We have a data pipe used to send AOF incremental diffs to the child,
 * and two other pipes used by the children to signal it finished with
 * the rewrite so no more data should be written, and another for the
 * parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
 int fds[6] = {-1, -1, -1, -1, -1, -1};
 int j;
 if (pipe(fds) == -1) goto error; /* parent -> children data. */
 if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
 if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
 /* Parent -> children data is non blocking. */
 if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
 if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
 if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
 server.aof_pipe_write_data_to_child = fds[1];
 server.aof_pipe_read_data_from_parent = fds[0];
 server.aof_pipe_write_ack_to_parent = fds[3];
 server.aof_pipe_read_ack_from_child = fds[2];
 server.aof_pipe_write_ack_to_child = fds[5];
 server.aof_pipe_read_ack_from_parent = fds[4];
 server.aof_stop_sending_diff = 0;
 return C_OK;
error:
 serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
 strerror(errno));
 for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
 return C_ERR;
}
創建的管道用途如下:
fds[0]和 fds[1]:對應了主進程和重寫子進程間用于傳遞操作命令的管道,它們分別對應讀描述符和寫描述符。
fds[2]和 fds[3]:對應了重寫子進程向父進程發送 ACK 信息的管道,它們分別對應讀描述符和寫描述符。
fds[4]和 fds[5]:對應了父進程向重寫子進程發送 ACK 信息的管道,它們分別對應讀描述符和寫描述符。
操作命令傳輸管道的使用
當 AOF 重寫子進程在執行時,主進程還會繼續接收和處理客戶端寫請求。這些寫操作會被主進程正常寫入 AOF 日志文件,這個過程是由 feedAppendOnlyFile 函數(在 aof.c 文件中)來完成
feedAppendOnlyFile -> aofRewriteBufferAppend,該函數會將修改命令寫入到下面結構體中,然后掛到server.aof_rewrite_buf_blocks鏈表中
typedef struct aofrwblock {
unsigned long used, free;
char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
接著調用aeCreateFileEvent創建可寫事件,處理函數為aofChildWriteDiffData,該函數就循環從server.aof_rewrite_buf_blocks取節點數據,寫入到管道中
子進程會調用aofReadDiffFromParent 函數讀取管道中的數據,讀取的數據放到 server.aof_child_diff(sds結構),它會將讀取的操作命令追加到全局變量 server 的 aof_child_diff 字符串中。而在 AOF 重寫函數 rewriteAppendOnlyFile 的執行過程最后,aof_child_diff 字符串會被寫入 AOF 重寫日志文件,以便我們在使用 AOF 重寫日志時,能盡可能地恢復重寫期間收到的操作
其實,aofReadDiffFromParent 函數一共會被以下三個函數調用。
rewriteAppendOnlyFileRio 函數:這個函數是由重寫子進程執行的,它負責遍歷 Redis 每個數據庫,生成 AOF 重寫日志,在這個過程中,它會不時地調用 aofReadDiffFromParent 函數。
rewriteAppendOnlyFile 函數:這個函數是重寫日志的主體函數,也是由重寫子進程執行的,它本身會調用 rewriteAppendOnlyFileRio 函數。此外,它在調用完 rewriteAppendOnlyFileRio 函數后,還會多次調用 aofReadDiffFromParent 函數,以盡可能多地讀取主進程在重寫日志期間收到的操作命令。
rdbSaveRio 函數:這個函數是創建 RDB 文件的主體函數。當我們使用 AOF 和 RDB 混合持久化機制時,這個函數也會調用 aofReadDiffFromParent 函數
從這里,我們可以看到,Redis 源碼在實現 AOF 重寫過程中,其實會多次讓重寫子進程向主進程讀取新收到的操作命令,這也是為了讓重寫日志盡可能多地記錄最新的操作,提供更加完整的操作記錄。
ACK 管道的使用
子進程完成AOF重寫后, rewriteAppendOnlyFile 這個函數在完成日志重寫后,就會調用 write 函數,向 aof_pipe_write_ack_to_parent 描述符對應的管道中寫入“!”,這就是重寫子進程向主進程發送 ACK 信號,讓主進程停止發送收到的新寫操作。即告訴父進程,不要寫修改命令道管道中了。
注冊了回調函數aofChildPipeReadable,這個函數會判斷從 aof_pipe_read_ack_from_child 管道描述符讀取的數據是否是“!”,如果是的話,那它就會調用 write 函數,往 aof_pipe_write_ack_to_child 管道描述符上寫入“!”,表示主進程已經收到重寫子進程發送的 ACK 信息,同時它會給重寫子進程回復一個 ACK 信息。
最后,重寫子進程執行的 rewriteAppendOnlyFile 函數,會調用 syncRead 函數,從 aof_pipe_read_ack_from_parent 管道描述符上,讀取主進程發送給它的 ACK 信息。
這樣一來,重寫子進程和主進程之間就通過兩個 ACK 管道,相互確認重寫過程結束了。
AOF 重寫子進程和主進程是使用了一個操作命令傳輸管道和兩個 ACK 信息發送管道。操作命令傳輸管道是用于主進程寫入收到的新操作命令,以及用于重寫子進程讀取操作命令,而 ACK 信息發送管道是在重寫結束時,重寫子進程和主進程用來相互確認重寫過程的結束。最后,重寫子進程會進一步將收到的操作命令記錄到重寫日志文件中
這樣一來,AOF 重寫過程中主進程收到的新寫操作,就不會被遺漏了。因為一方面,這些新寫操作會被記錄在正常的 AOF 日志中,另一方面,主進程會將新寫操作緩存在 aof_rewrite_buf_blocks 數據塊列表中,并通過管道發送給重寫子進程。這樣,就能盡可能地保證重寫日志具有最新、最完整的寫操作了。
子進程什么時候發送ack
子進程在正常的重寫完成后至多再等一秒,在這一秒內如果有連續20ms沒有可讀事件發生,那么直接發送ack
主進程執行寫操作寫AOF的流程
前面說了執行命令會調用到call(client *c, int flags),該命令會調用propagate函數將命令傳播到AOF文件,具體是調用feedAppendOnlyFile函數中會將命令寫入server.aof_buf緩存中,然后serverCron定時函數會調用flushAppendOnlyFile考慮是否需要將aof_buf緩沖區中的內容寫入和保存到AOF 文件里面
call(client *c, int flags) -> propagate -> feedAppendOnlyFile -> flushAppendOnlyFile -> 寫入server.aof_buf
serverCron -> flushAppendOnlyFile -> 寫AOF文件
RDB-AOF混合持久化
即AOF文件的前半段是RDB格式的全量數據后半段是redis命令格式的增量數據,server.aof_use_rdb_preamble大于0(混合持久化開關打開)時,就會進入rdbSaveRio函數先以RDB格式來保存全量數據
前半段是RDB格式的全量數據后半段是redis命令格式的增量數據。
加載RDB/AOF
main -> loadDataFromDisk -> rdbLoad -> rdbLoadRio 完成rdb文件的加載
main -> loadDataFromDisk -> loadAppendOnlyFile 完成AOF文件的加載,如果AOF文件是混合持久化的文件,則會首先調用rdbLoadRio加載RDB,加載完后該函數會新建一個fake的client來一行一行執行AOF文件中的命令,完成AOF文件的加載
因為AOF文件更完整,所以代碼中會優先加載AOF文件