From 7f6d7d6162e4d7dd3bd7cf741c03a1ab576df9b3 Mon Sep 17 00:00:00 2001 From: huihuang-chen <48422655+huihuang-chen@users.noreply.github.com> Date: Sun, 23 Oct 2022 21:51:17 +0800 Subject: [PATCH] =?UTF-8?q?aof.c=E6=B7=BB=E5=8A=A0=E4=B8=AD=E6=96=87?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=20(#80)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Binbin --- README.md | 2 +- src/aof.c | 217 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 84595825f..b26b5f9c3 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ redis 仓库链接:https://github.com/redis/redis
| [ae.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/ae.c) | redis 事件循环器功能 | 完成 | | [multi.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/multi.c) | redis 事务实现 | 完成 | | [redis-check-rdb.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/redis-check-rdb.c) | Redis RDB 检查工具 | 完成 | - +| [aof.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/aof.c) | redis AOF功能 | 过半 |

尚未有中文注释的文件不会出现在表格中。
更新日期:2022/10/22 diff --git a/src/aof.c b/src/aof.c index acaffa4a6..3e2995d66 100644 --- a/src/aof.c +++ b/src/aof.c @@ -41,6 +41,14 @@ #include #include +/* redis7 的 aof 功能和之前版本的 aof 功能在文件管理方面有了很大的改变 + * redis7 对 aof 文件进行了分类,分为以下 4 类文件(临时文件不用管) + * 1) manifest: 清单文件,该文件的内容对应 aofManifest 结构,记录所有其他类型文件的文件信息,并进行了分类 + * 2) base: 基础文件,就是在执行 bgrewrite 的时候,当前数据的 RDB 快照或者 AOF,该文件的文件名后缀也是以 rdb 或者 aof 结尾,取决于 aof-use-rdb-preamble 配置项 + * 3) incr: 增量命令文件,执行 bgrewrite 之后到的写命令,会存入该文件 + * 4) history: 历史文件,执行 bgrewrite 之后,之前存在的 incr,base 文件就是历史文件,会通过 bio 关闭文件或者删除文件 + */ + void freeClientArgv(client *c); off_t getAppendOnlyFileSize(sds filename, int *status); off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status); @@ -82,16 +90,27 @@ void aofManifestFreeAndUpdate(aofManifest *am); * ------------------------------------------------------------------------- */ /* Naming rules. */ +/* AOF 各种类型的文件命名规则 */ +/* 基本文件,做 bgrewrite 的时候 redis 数据快照文件 */ #define BASE_FILE_SUFFIX ".base" +/* 增量命令文件,做 bgrewrite 之后到达的命令写入该文件 */ #define INCR_FILE_SUFFIX ".incr" +/* 使用 rdb 编码格式在 bgrewrite 的时候存储 base 文件的文件后缀标识 */ #define RDB_FORMAT_SUFFIX ".rdb" +/* 不使用 rdb 编码格式在 bgrewrite 的时候存储 base 文件的文件后缀标识,和 incr 文件使用的后缀标识 */ #define AOF_FORMAT_SUFFIX ".aof" +/* 清单文件后缀,该类型的文件会存储所有其他类型文件的文件名 */ #define MANIFEST_NAME_SUFFIX ".manifest" +/* 临时文件前缀 */ #define TEMP_FILE_NAME_PREFIX "temp-" /* AOF manifest key. */ +/* 下面三个宏在生成 aof 文件信息字符串的时候被使用 */ +/* 文件名 */ #define AOF_MANIFEST_KEY_FILE_NAME "file" +/* 文件的序列号 */ #define AOF_MANIFEST_KEY_FILE_SEQ "seq" +/* 文件的类型 */ #define AOF_MANIFEST_KEY_FILE_TYPE "type" /* Create an empty aofInfo. */ @@ -100,13 +119,16 @@ aofInfo *aofInfoCreate(void) { } /* Free the aofInfo structure (pointed to by ai) and its embedded file_name. */ +/* 释放 aofInfo 实例空间 */ void aofInfoFree(aofInfo *ai) { serverAssert(ai != NULL); + /* 释放其文件名空间 */ if (ai->file_name) sdsfree(ai->file_name); zfree(ai); } /* Deep copy an aofInfo. */ +/* 深拷贝一份 aofInfo 实例 */ aofInfo *aofInfoDup(aofInfo *orig) { serverAssert(orig != NULL); aofInfo *ai = aofInfoCreate(); @@ -117,12 +139,15 @@ aofInfo *aofInfoDup(aofInfo *orig) { } /* Format aofInfo as a string and it will be a line in the manifest. */ +/* 将 aofInfo 实例格式化成字符串, + * 这个主要用于将内存中的清单信息写入文件的时候使用 */ sds aofInfoFormat(sds buf, aofInfo *ai) { sds filename_repr = NULL; if (sdsneedsrepr(ai->file_name)) filename_repr = sdscatrepr(sdsempty(), ai->file_name, sdslen(ai->file_name)); + /* 格式:file xxx seq xxx type xxx */ sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c\n", AOF_MANIFEST_KEY_FILE_NAME, filename_repr ? filename_repr : ai->file_name, AOF_MANIFEST_KEY_FILE_SEQ, ai->file_seq, @@ -133,6 +158,7 @@ sds aofInfoFormat(sds buf, aofInfo *ai) { } /* Method to free AOF list elements. */ +/* 释放 aofInfo 列表中的一个元素 */ void aofListFree(void *item) { aofInfo *ai = (aofInfo *)item; aofInfoFree(ai); @@ -144,10 +170,12 @@ void *aofListDup(void *item) { } /* Create an empty aofManifest, which will be called in `aofLoadManifestFromDisk`. */ +/* 创建一个 aofManifest 结构 */ aofManifest *aofManifestCreate(void) { aofManifest *am = zcalloc(sizeof(aofManifest)); am->incr_aof_list = listCreate(); am->history_aof_list = listCreate(); + /* 给 incr aof 信息列表和历史 aof 文件列表绑定释放函数和复制函数 */ listSetFreeMethod(am->incr_aof_list, aofListFree); listSetDupMethod(am->incr_aof_list, aofListDup); listSetFreeMethod(am->history_aof_list, aofListFree); @@ -156,6 +184,7 @@ aofManifest *aofManifestCreate(void) { } /* Free the aofManifest structure (pointed to by am) and its embedded members. */ +/* 释放 aofManifest 实例 */ void aofManifestFree(aofManifest *am) { if (am->base_aof_info) aofInfoFree(am->base_aof_info); if (am->incr_aof_list) listRelease(am->incr_aof_list); @@ -163,6 +192,7 @@ void aofManifestFree(aofManifest *am) { zfree(am); } +/* 获取 AOF 清单文件名 appendonly.aof.manifest */ sds getAofManifestFileName() { return sdscatprintf(sdsempty(), "%s%s", server.aof_filename, MANIFEST_NAME_SUFFIX); @@ -188,6 +218,7 @@ sds getTempAofManifestFileName() { * The base file, if exists, will always be first, followed by history files, * and incremental files. */ +/* 将 aofManifest 清单格式化成字符串 */ sds getAofManifestAsString(aofManifest *am) { serverAssert(am != NULL); @@ -197,6 +228,7 @@ sds getAofManifestAsString(aofManifest *am) { /* 1. Add BASE File information, it is always at the beginning * of the manifest file. */ + /* 格式化 base 文件信息,其实就是调用 aofInfoFormat 函数,后面两种文件信息也一样 */ if (am->base_aof_info) { buf = aofInfoFormat(buf, am->base_aof_info); } @@ -228,6 +260,7 @@ sds getAofManifestAsString(aofManifest *am) { * in order to support seamless upgrades from previous versions which did not * use them. */ +/* 从磁盘中加载 aof 清单文件 */ void aofLoadManifestFromDisk(void) { server.aof_manifest = aofManifestCreate(); if (!dirExists(server.aof_dirname)) { @@ -235,7 +268,9 @@ void aofLoadManifestFromDisk(void) { return; } + /* 获取 aof manifest 文件名 */ sds am_name = getAofManifestFileName(); + /* 获取 aof manifest 文件路径 */ sds am_filepath = makePath(server.aof_dirname, am_name); if (!fileExist(am_filepath)) { serverLog(LL_DEBUG, "The AOF manifest file %s doesn't exist", am_name); @@ -244,7 +279,9 @@ void aofLoadManifestFromDisk(void) { return; } + /* 从磁盘中加载 aof manifest 文件,并根据其内容生成 aofManifest 结构 */ aofManifest *am = aofLoadManifestFromFile(am_filepath); + /* 释放之前的 aofManifest 结构实例,更改为现在的 am */ if (am) aofManifestFreeAndUpdate(am); sdsfree(am_name); sdsfree(am_filepath); @@ -387,6 +424,7 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath) { * try to modify it. Once everything is modified, we will atomically make the * `server.aof_manifest` point to this temporary aof_manifest. */ +/* 复制给定的 aofManifest 实例,生成一个新的 aofManifest 实例 */ aofManifest *aofManifestDup(aofManifest *orig) { serverAssert(orig != NULL); aofManifest *am = zcalloc(sizeof(aofManifest)); @@ -408,6 +446,7 @@ aofManifest *aofManifestDup(aofManifest *orig) { /* Change the `server.aof_manifest` pointer to 'am' and free the previous * one if we have. */ +/* 释放之前旧的 aofManifest 实例,使用新的 am 设置 server.aof_manifest 属性 */ void aofManifestFreeAndUpdate(aofManifest *am) { serverAssert(am != NULL); if (server.aof_manifest) aofManifestFree(server.aof_manifest); @@ -423,21 +462,28 @@ void aofManifestFreeAndUpdate(aofManifest *am) { * appendonly.aof.1.base.aof (server.aof_use_rdb_preamble is no) * appendonly.aof.1.base.rdb (server.aof_use_rdb_preamble is yes) */ +/* 获取新的 base 文件名,并标记之前的 base 文件为历史文件 + * 新的 base 文件名会根据 server.aof_use_rdb_preamble 来进行判断,该属性默认值是 1,认为是 rdb 这种就行了 */ sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) { serverAssert(am != NULL); if (am->base_aof_info) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); + /* 标记之前的 base 文件为历史类型 */ am->base_aof_info->file_type = AOF_FILE_TYPE_HIST; listAddNodeHead(am->history_aof_list, am->base_aof_info); } + /* 选择后缀是 .aof 还是 .rdb */ char *format_suffix = server.aof_use_rdb_preamble ? RDB_FORMAT_SUFFIX:AOF_FORMAT_SUFFIX; aofInfo *ai = aofInfoCreate(); + /* 设置文件名 */ ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename, ++am->curr_base_file_seq, BASE_FILE_SUFFIX, format_suffix); + /* 设置序列号 */ ai->file_seq = am->curr_base_file_seq; + /* 设置文件类型 */ ai->file_type = AOF_FILE_TYPE_BASE; am->base_aof_info = ai; am->dirty = 1; @@ -451,6 +497,7 @@ sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) { * for example: * appendonly.aof.1.incr.aof */ +/* 获取新的 incr aof 文件信息,添加进清单中,返回新的 incr aof 文件名 */ sds getNewIncrAofName(aofManifest *am) { aofInfo *ai = aofInfoCreate(); ai->file_type = AOF_FILE_TYPE_INCR; @@ -463,6 +510,7 @@ sds getNewIncrAofName(aofManifest *am) { } /* Get temp INCR type AOF name. */ +/* 获取临时 incr 文件名 temp-xxx.incr */ sds getTempIncrAofName() { return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename, INCR_FILE_SUFFIX); @@ -488,6 +536,7 @@ sds getLastIncrAofName(aofManifest *am) { * AOF_FILE_TYPE_INCR to AOF_FILE_TYPE_HIST, and move them to the * 'history_aof_list'. */ +/* 标记 incr aof 文件为历史文件 */ void markRewrittenIncrAofAsHistory(aofManifest *am) { serverAssert(am != NULL); if (!listLength(am->incr_aof_list)) { @@ -497,30 +546,41 @@ void markRewrittenIncrAofAsHistory(aofManifest *am) { listNode *ln; listIter li; + /* 根据清单信息中的 incr aof 文件信息列表从链表尾部向头部生成迭代器 */ listRewindTail(am->incr_aof_list, &li); /* "server.aof_fd != -1" means AOF enabled, then we must skip the * last AOF, because this file is our currently writing. */ + /* 如果之前存在 incr aof 文件 */ if (server.aof_fd != -1) { + /* 获取尾结点 */ ln = listNext(&li); serverAssert(ln != NULL); } /* Move aofInfo from 'incr_aof_list' to 'history_aof_list'. */ + /* 从尾结点开始,如果上一个节点不为空 */ while ((ln = listNext(&li)) != NULL) { + /* 获取 incr aof 文件信息 */ aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_INCR); + /* 复制一份 incr aof 信息 */ aofInfo *hai = aofInfoDup(ai); + /* 设置文件类型为历史文件 */ hai->file_type = AOF_FILE_TYPE_HIST; + /* 将新复制的 aofInfo 实例加入历史文件列表 */ listAddNodeHead(am->history_aof_list, hai); + /* 从 incr aof 信息列表移除当前节点 */ listDelNode(am->incr_aof_list, ln); } + /* 标记清单信息修改了,需要持久化 */ am->dirty = 1; } /* Write the formatted manifest string to disk. */ +/* 把清单信息的格式化字符串写入清单文件中 */ int writeAofManifestFile(sds buf) { int ret = C_OK; ssize_t nwritten; @@ -531,6 +591,7 @@ int writeAofManifestFile(sds buf) { sds tmp_am_name = getTempAofManifestFileName(); sds tmp_am_filepath = makePath(server.aof_dirname, tmp_am_name); + /* 打开临时文件 */ int fd = open(tmp_am_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644); if (fd == -1) { serverLog(LL_WARNING, "Can't open the AOF manifest file %s: %s", @@ -540,6 +601,7 @@ int writeAofManifestFile(sds buf) { goto cleanup; } + /* 循环将清单字符串写入临时清单文件中 */ len = sdslen(buf); while(len) { nwritten = write(fd, buf, len); @@ -558,6 +620,7 @@ int writeAofManifestFile(sds buf) { buf += nwritten; } + /* 刷盘操作 */ if (redis_fsync(fd) == -1) { serverLog(LL_WARNING, "Fail to fsync the temp AOF file %s: %s.", tmp_am_name, strerror(errno)); @@ -566,6 +629,7 @@ int writeAofManifestFile(sds buf) { goto cleanup; } + /* 将临时文件重命名为新的清单文件 */ if (rename(tmp_am_filepath, am_filepath) != 0) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF manifest file %s into %s: %s", @@ -594,12 +658,15 @@ int writeAofManifestFile(sds buf) { } /* Persist the aofManifest information pointed to by am to disk. */ +/* 将 aofManifest 实例持久化到磁盘中 */ int persistAofManifest(aofManifest *am) { if (am->dirty == 0) { return C_OK; } + /* 将 aofManifest 清单实例格式化成字符串 */ sds amstr = getAofManifestAsString(am); + /* 将格式化后的字符串写入清单文件 */ int ret = writeAofManifestFile(amstr); sdsfree(amstr); if (ret == C_OK) am->dirty = 0; @@ -664,6 +731,7 @@ void aofUpgradePrepare(aofManifest *am) { * The function will traverse the 'history_aof_list' and submit * the delete task to the bio thread. */ +/* 该方法是删除内存中历史文件信息,然后提交一个关闭历史文件的任务给 bio 线程 */ int aofDelHistoryFiles(void) { if (server.aof_manifest == NULL || server.aof_disable_auto_gc == 1 || @@ -675,22 +743,28 @@ int aofDelHistoryFiles(void) { listNode *ln; listIter li; + /* 遍历清单中所有历史文件信息 */ listRewind(server.aof_manifest->history_aof_list, &li); while ((ln = listNext(&li)) != NULL) { aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_HIST); serverLog(LL_NOTICE, "Removing the history file %s in the background", ai->file_name); sds aof_filepath = makePath(server.aof_dirname, ai->file_name); + /* 提交关闭文件的任务 */ bg_unlink(aof_filepath); sdsfree(aof_filepath); + /* 删除列表中的节点 */ listDelNode(server.aof_manifest->history_aof_list, ln); } + /* 标记清单改变了 */ server.aof_manifest->dirty = 1; + /* 持久化 AOF 清单文件 */ return persistAofManifest(server.aof_manifest); } /* Used to clean up temp INCR AOF when AOFRW fails. */ +/* 关闭临时文件 */ void aofDelTempIncrAofFile() { sds aof_filename = getTempIncrAofName(); sds aof_filepath = makePath(server.aof_dirname, aof_filename); @@ -785,6 +859,7 @@ int aofFileExist(char *filename) { * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file. * */ +/* 打开一个新的 aof 文件 */ int openNewIncrAofForAppend(void) { serverAssert(server.aof_manifest != NULL); int newfd = -1; @@ -863,12 +938,27 @@ int openNewIncrAofForAppend(void) { * AOFRW, which may be that we have reached the 'next_rewrite_time' or the number of INCR * AOFs has not reached the limit threshold. * */ +/* 是否限制了执行 AOFRW + * + * 当前,如果 AOFRW 失败,redis 将会自动重试 AOFRW,如果重试后还是一直失败,将会存在大量的非常小的 + * incr 文件,因此需要采取措施限制 AOFRW 执行 + * + * 为了解决上面的问题,使用了时间限制。当 AOFRW 失败,将会延迟 1 分钟再执行下一次 AOFRW,如果之后还 + * 失败,就延迟 2 分钟,接着是 4,8,16,.... 最大是 60 分钟 + * + * 在 AOFRW 限制期间,我们仍然可以使用 bgrewriteaof 命令主动并且立即触发 AOFRW + * + * 该函数返回 1 表示 AOFRW 被限制了,不能执行,如果是 0 则可以执行 AOFRW + */ +/* 开始限制的阈值,即 AOFRW 连续失败 3 次,就开始进行限制 */ #define AOF_REWRITE_LIMITE_THRESHOLD 3 +/* AOFRW 限制的最大延迟时间是 60 分钟 */ #define AOF_REWRITE_LIMITE_MAX_MINUTES 60 /* 1 hour */ int aofRewriteLimited(void) { static int next_delay_minutes = 0; static time_t next_rewrite_time = 0; + /* 如果 AOFRW 连续失败次数小于 3 次,可以执行 AOFRW */ if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) { /* We may be recovering from limited state, so reset all states. */ next_delay_minutes = 0; @@ -878,19 +968,23 @@ int aofRewriteLimited(void) { /* if it is in the limiting state, then check if the next_rewrite_time is reached */ if (next_rewrite_time != 0) { + /* 如果当前时间还没超过指定的限制时间,不能执行 AOFRW */ if (server.unixtime < next_rewrite_time) { return 1; } else { + /* 超过了,将 next_rewrite_time 置 0,并且返回 0 */ next_rewrite_time = 0; return 0; } } + /* 随着连续失败次数的增加,需要调整限制时间,每次增长之前的两倍时间,知道达到最大时间限制 */ next_delay_minutes = (next_delay_minutes == 0) ? 1 : (next_delay_minutes * 2); if (next_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) { next_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES; } + /* 下一次执行 AOFRW 的时间戳 */ next_rewrite_time = server.unixtime + next_delay_minutes * 60; serverLog(LL_WARNING, "Background AOF rewrite has repeatedly failed and triggered the limit, will retry in %d minutes", next_delay_minutes); @@ -903,17 +997,20 @@ int aofRewriteLimited(void) { /* Return true if an AOf fsync is currently already in progress in a * BIO thread. */ +/* bio 线程是否有 aof fysnc 任务要执行 */ int aofFsyncInProgress(void) { return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; } /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ +/* 使用 bio 线程异步刷盘 */ void aof_background_fsync(int fd) { bioCreateFsyncJob(fd); } /* Kills an AOFRW child process if exists */ +/* 关闭 aof rewrite 子进程 */ void killAppendOnlyChild(void) { int statloc; /* No AOFRW child? return. */ @@ -924,6 +1021,7 @@ void killAppendOnlyChild(void) { if (kill(server.child_pid,SIGUSR1) != -1) { while(waitpid(-1, &statloc, 0) != server.child_pid); } + /* 子进程退出需要移除掉自己产生的临时 base 文件 */ aofRemoveTempFile(server.child_pid); resetChildState(); server.aof_rewrite_time_start = -1; @@ -954,25 +1052,31 @@ void stopAppendOnly(void) { /* Called when the user switches from "appendonly no" to "appendonly yes" * at runtime using the CONFIG command. */ +/* 启动 aof 的时候,会调用该函数 */ int startAppendOnly(void) { serverAssert(server.aof_state == AOF_OFF); server.aof_state = AOF_WAIT_REWRITE; if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) { + /* 如果有非 AOF 的子进程正在执行,这里不能直接执行 rewrite aof,会设置 aof_rewrite_scheduled 为 1,表示之后需要执行 + * 一次 rewrite aof 操作 */ server.aof_rewrite_scheduled = 1; serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible."); } else if (server.in_exec){ + /* 这里是正在进行事务的情况下,也不能直接执行 rewrite aof */ server.aof_rewrite_scheduled = 1; serverLog(LL_WARNING,"AOF was enabled during a transaction. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and * start a new one: the old one cannot be reused because it is not * accumulating the AOF buffer. */ + /* 如果当前子进程类型是 aof 子进程,直接干掉当前子进程,开始一个新的 aof 子进程 */ if (server.child_type == CHILD_TYPE_AOF) { serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now."); killAppendOnlyChild(); } + /* 开始执行 bgrewrite */ if (rewriteAppendOnlyFileBackground() == C_ERR) { server.aof_state = AOF_OFF; serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); @@ -1004,10 +1108,12 @@ int startAppendOnly(void) { * is likely to fail. However apparently in modern systems this is no longer * true, and in general it looks just more resilient to retry the write. If * there is an actual error condition we'll get it at the next try. */ +/* 将 aof_buf 中的数据写到 incr 增量命令文件中 */ ssize_t aofWrite(int fd, const char *buf, size_t len) { ssize_t nwritten = 0, totwritten = 0; while(len) { + /* 调用 write 写数据到文件 */ nwritten = write(fd, buf, len); if (nwritten < 0) { @@ -1015,11 +1121,14 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { return totwritten ? totwritten : -1; } + /* 更新 buf 剩余待写的数据的大小 */ len -= nwritten; + /* 更新待写数据的指针 */ buf += nwritten; totwritten += nwritten; } + /* 返回本次写入的数据量 */ return totwritten; } @@ -1042,6 +1151,16 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { * However if force is set to 1 we'll write regardless of the background * fsync. */ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ +/* 将 aof_buf 中的数据写入磁盘中. + * force 参数表示要执行刷盘,redis 有三种情况下会强制刷盘 + * 1) redis 服务关闭的时候 + * 2) 停止 aof 的时候 + * 3) 做 bgrewrite 的时候 + * + * 这里还要注意两种 aof fsync 刷盘策略: + * always 表示写入磁盘之后就执行刷盘 + * everysec 会使用 bio 大概每秒做一次异步刷盘,且该策略下存在非强制刷盘的功能 + */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; @@ -1053,9 +1172,12 @@ void flushAppendOnlyFile(int force) { * called only when aof buffer is not empty, so if users * stop write commands before fsync called in one second, * the data in page cache cannot be flushed in time. */ + /* 每秒刷盘一次 */ if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.aof_fsync_offset != server.aof_current_size && + /* unixtime 是 redis 的秒级时间戳 */ server.unixtime > server.aof_last_fsync && + /* 如果 bio 线程的所有 aof fsync 任务都执行完了 */ !(sync_in_progress = aofFsyncInProgress())) { goto try_fsync; } else { @@ -1063,26 +1185,34 @@ void flushAppendOnlyFile(int force) { } } + /* sync_in_progress 表示是否有正在执行的 bg aof fsync 任务 */ if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = aofFsyncInProgress(); + /* 如果没有强制要将 aof_buf 缓冲区中的数据写入 aof incr 文件中的情况 */ if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ + /* 如果有正在执行的 bg aof fsync 任务,这里会尝试延迟 2 秒将内存数据写入文件 */ if (sync_in_progress) { + /* 这里每次调用完 aofWrite 函数后,会将 server.aof_flush_postponed_start 置 0 + * 该属性为 0 表示现在开始新一次的 aof_buf 写磁盘操作 */ if (server.aof_flush_postponed_start == 0) { /* No previous write postponing, remember that we are * postponing the flush and return. */ + /* 记录当前的秒级时间戳 */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ + /* 如果还没超过 2s 直接返回,不执行写磁盘操作 */ return; } /* Otherwise fall through, and go write since we can't wait * over two seconds. */ + /* 如果上面的逻辑都没有走,没有返回,下面就要开始执行写磁盘操作了,这里将 fsync 计数器 +1 */ server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } @@ -1098,6 +1228,7 @@ void flushAppendOnlyFile(int force) { } latencyStartMonitor(latency); + /* 将 aof_buf 中的数据写入文件 */ nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: @@ -1115,6 +1246,7 @@ void flushAppendOnlyFile(int force) { latencyAddSampleIfNeeded("aof-write",latency); /* We performed the write so reset the postponed flush sentinel to zero. */ + /* 这里将 aof_flush_postponed_start 属性置 0 */ server.aof_flush_postponed_start = 0; if (nwritten != (ssize_t)sdslen(server.aof_buf)) { @@ -1175,9 +1307,11 @@ void flushAppendOnlyFile(int force) { /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ + /* 这里是每次只同步部分 aof_buf 中的数据到磁盘的情况 */ if (nwritten > 0) { server.aof_current_size += nwritten; server.aof_last_incr_size += nwritten; + /* 会将 aof_buf 中已经同步到磁盘的部分空间释放掉 */ sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ @@ -1217,6 +1351,7 @@ void flushAppendOnlyFile(int force) { /* Let's try to get this data on the disk. To guarantee data safe when * the AOF fsync policy is 'always', we should exit if failed to fsync * AOF (see comment next to the exit(1) after write error above). */ + /* aof fsync 策略为 always 的情况下,直接刷盘 */ if (redis_fsync(server.aof_fd) == -1) { serverLog(LL_WARNING,"Can't persist AOF for fsync error when the " "AOF fsync policy is 'always': %s. Exiting...", strerror(errno)); @@ -1229,6 +1364,7 @@ void flushAppendOnlyFile(int force) { } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) { + /* 这里是使用 bio 做 aof fsync 刷盘操作 */ aof_background_fsync(server.aof_fd); server.aof_fsync_offset = server.aof_current_size; } @@ -1236,24 +1372,30 @@ void flushAppendOnlyFile(int force) { } } +/* 将命令格式化 */ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; + /* 第一个字符为 * */ buf[0] = '*'; + /* 写入参数数量 */ len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); + /* 每个参数格式化 */ for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); + /* 写入参数字节大小 */ buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); + /* 写入参数字符串 */ dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); @@ -1280,6 +1422,7 @@ sds genAofTimestampAnnotationIfNeeded(int force) { return ts; } +/* 向 aof 缓冲区中追加一个命令 */ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { sds buf = sdsempty(); @@ -1296,17 +1439,21 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ + /* 如果命令操作的数据库和上一次选择的数据库不同 */ if (dictid != server.aof_selected_db) { char seldb[64]; + /* 向缓冲区添加一条 select dictid 命令 */ snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); + /* 更新操作的数据库号 */ server.aof_selected_db = dictid; } /* All commands should be propagated the same way in AOF as in replication. * No need for AOF-specific translation. */ + /* 将命令和参数格式化 */ buf = catAppendOnlyGenericCommand(buf,argc,argv); /* Append to the AOF buffer. This will be flushed on disk just before @@ -1315,6 +1462,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { if (server.aof_state == AOF_ON || (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF)) { + /* 存入 aof_buf 中 */ server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf)); } @@ -1366,6 +1514,7 @@ int loadSingleAppendOnlyFile(char *filename) { int ret = AOF_OK; sds aof_filepath = makePath(server.aof_dirname, filename); + /* 打开 base 文件 */ FILE *fp = fopen(aof_filepath, "r"); if (fp == NULL) { int en = errno; @@ -1391,17 +1540,20 @@ int loadSingleAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; client *old_client = server.current_client; + /* 伪造一个 AOF 客户端 */ fakeClient = server.current_client = createAOFClient(); /* Check if the AOF file is in RDB format (it may be RDB encoded base AOF * or old style RDB-preamble AOF). In that case we need to load the RDB file * and later continue loading the AOF tail if it is an old style RDB-preamble AOF. */ char sig[5]; /* "REDIS" */ + /* 这里判断 base 文件是 rdb 的格式存储还是 aof 格式存储 */ if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) { /* Not in RDB format, seek back at 0 offset. */ if (fseek(fp,0,SEEK_SET) == -1) goto readerr; } else { /* RDB format. Pass loading the RDB functions. */ + /* 这里是处理 RDB 格式的 base 文件的读取 */ rio rdb; int old_style = !strcmp(filename, server.aof_filename); if (old_style) @@ -1410,7 +1562,9 @@ int loadSingleAppendOnlyFile(char *filename) { serverLog(LL_NOTICE, "Reading RDB base file on AOF loading..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; + /* 初始化一个 rio 结构 */ rioInitWithFile(&rdb,fp); + /* 这里调用 rdb 加载文件的接口加载 base 文件 */ if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { if (old_style) serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename); @@ -1426,6 +1580,7 @@ int loadSingleAppendOnlyFile(char *filename) { } /* Read the actual AOF file, in REPL format, command by command. */ + /* 这里是读取实际的 aof 文件 */ while(1) { int argc, j; unsigned long len; @@ -1442,6 +1597,7 @@ int loadSingleAppendOnlyFile(char *filename) { processEventsWhileBlocked(); processModuleLoadingProgressEvent(1); } + /* 从 base 文件中读取一行数据 */ if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) { break; @@ -1452,6 +1608,7 @@ int loadSingleAppendOnlyFile(char *filename) { if (buf[0] == '#') continue; /* Skip annotations */ if (buf[0] != '*') goto fmterr; if (buf[1] == '\0') goto readerr; + /* 读取参数数量 */ argc = atoi(buf+1); if (argc < 1) goto fmterr; if ((size_t)argc > SIZE_MAX / sizeof(robj*)) goto fmterr; @@ -1463,6 +1620,7 @@ int loadSingleAppendOnlyFile(char *filename) { fakeClient->argv = argv; fakeClient->argv_len = argc; + /* 根据参数数量从 aof 文件中读取参数 */ for (j = 0; j < argc; j++) { /* Parse the argument len. */ char *readres = fgets(buf,sizeof(buf),fp); @@ -1474,9 +1632,11 @@ int loadSingleAppendOnlyFile(char *filename) { else goto fmterr; } + /* 获取当前参数的字节数 */ len = strtol(buf+1,NULL,10); /* Read it into a string object. */ + /* 读取出参数 */ argsds = sdsnewlen(SDS_NOINIT,len); if (len && fread(argsds,len,1,fp) == 0) { sdsfree(argsds); @@ -1484,9 +1644,11 @@ int loadSingleAppendOnlyFile(char *filename) { freeClientArgv(fakeClient); goto readerr; } + /* 将参数封装成 redis 对象 */ argv[j] = createObject(OBJ_STRING,argsds); /* Discard CRLF. */ + /* 丢弃换行符 */ if (fread(buf,2,1,fp) == 0) { fakeClient->argc = j+1; /* Free up to j. */ freeClientArgv(fakeClient); @@ -1495,6 +1657,7 @@ int loadSingleAppendOnlyFile(char *filename) { } /* Command lookup */ + /* 根据参数和参数数量从命令表中获取对应的命令 */ cmd = lookupCommand(argv,argc); if (!cmd) { serverLog(LL_WARNING, @@ -1509,6 +1672,7 @@ int loadSingleAppendOnlyFile(char *filename) { /* Run the command in the context of a fake client */ fakeClient->cmd = fakeClient->lastcmd = cmd; + /* 如果不是 exec 命令,就将命令入队到事务队列 */ if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { @@ -1517,6 +1681,7 @@ int loadSingleAppendOnlyFile(char *filename) { * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { + /* 是 exec 命令,调用命令的处理函数 */ cmd->proc(fakeClient); } @@ -1654,6 +1819,7 @@ int loadAppendOnlyFiles(aofManifest *am) { startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0); /* Load BASE AOF if needed. */ + /* 加载 base 文件 */ if (am->base_aof_info) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); aof_name = (char*)am->base_aof_info->file_name; @@ -2326,6 +2492,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ +/* 子进程重写 aof 文件 */ int rewriteAppendOnlyFile(char *filename) { rio aof; FILE *fp = NULL; @@ -2333,6 +2500,7 @@ int rewriteAppendOnlyFile(char *filename) { /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ + /* 打开一个临时文件 */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { @@ -2340,24 +2508,33 @@ int rewriteAppendOnlyFile(char *filename) { return C_ERR; } + /* 使用 FILE 类型的 RIO 来将数据写入临时文件 */ + + /* 初始化一个 file 类型的 rio */ rioInitWithFile(&aof,fp); + /* 如果配置了 aof 增量刷盘 */ if (server.aof_rewrite_incremental_fsync) + /* 设置 rio 的 autoSync (自动刷盘的阈值) */ rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); startSaving(RDBFLAGS_AOF_PREAMBLE); + /* 使用 rdb 编码格式生成 base 文件内容,redis 默认使用这种方式 */ if (server.aof_use_rdb_preamble) { int error; + /* 使用 rdb 编码格式保存 redis 的数据快照 */ if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } } else { + /* 使用 aof 自己的格式生成 base 文件内容 */ if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Make sure data will not remain on the OS's output buffers */ + /* page cache 刷盘 */ if (fflush(fp)) goto werr; if (fsync(fileno(fp))) goto werr; if (fclose(fp)) { fp = NULL; goto werr; } @@ -2365,6 +2542,7 @@ int rewriteAppendOnlyFile(char *filename) { /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ + /* 重命名文件 */ if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); @@ -2400,9 +2578,26 @@ int rewriteAppendOnlyFile(char *filename) { * 4d) persist AOF manifest file * 4e) Delete the history files use bio */ +/* 后台重写 aof 文件的步骤 + * 1) 用户调用 BGREWRITEAOF 命令 + * 2) redis 调用当前函数,fork 子进程 + * 2a) 子进程使用临时文件重写 aof + * 2b) 父进程打开一个新的 incr 类型的 aof 文件继续写后续到来的写命令 + * 3) 当子进程完成 2a 步骤了 + * 4) 父进程会判断子进程的退出码,如果是 OK + * 4a) 获取一个新的 base 文件名,标记之前的 base 文件为历史文件 + * 4b) 用新的 base 文件名重命名之前子进程的临时文件 + * 4c) 标记之前的 incr aof 文件为历史文件 + * 4d) 持久化 aof 清单文件 + * 4e) 使用 bio 线程删除历史文件 + * + * 该函数其实并没有做 4) 的逻辑,4) 的逻辑是在 backgroundRewriteDoneHandler 函数中实现的 + * 看完该函数,可以直接跳到 backgroundRewriteDoneHandler 函数看剩余逻辑 + */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; + /* 如果有正在运行的子进程,返回 */ if (hasActiveChildProcess()) return C_ERR; if (dirCreateIfMissing(server.aof_dirname) == -1) { @@ -2414,21 +2609,31 @@ int rewriteAppendOnlyFileBackground(void) { /* We set aof_selected_db to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command. */ + /* 将 aof_selected_db 设置为 -1 */ server.aof_selected_db = -1; + /* 将 aof 缓冲区中的数据写入磁盘文件 */ flushAppendOnlyFile(1); + /* 打开一个新的 incr aof 文件 */ if (openNewIncrAofForAppend() != C_OK) { server.aof_lastbgrewrite_status = C_ERR; return C_ERR; } server.stat_aof_rewrites++; + /* 调用 fork 系统调用生成子进程 */ if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) { char tmpfile[256]; + /* 子进程部分的逻辑 */ /* Child */ + /* 设置进程名 */ redisSetProcTitle("redis-aof-rewrite"); + /* 给子进程绑定 cpu 核 */ redisSetCpuAffinity(server.aof_rewrite_cpulist); + /* 获取临时文件名 */ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); + /* 使用临时文件写 redis 数据快照 */ if (rewriteAppendOnlyFile(tmpfile) == C_OK) { + /* rewrite 成功 */ serverLog(LL_NOTICE, "Successfully created the temporary AOF base file %s", tmpfile); sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite"); @@ -2471,12 +2676,15 @@ void bgrewriteaofCommand(client *c) { } } +/* 移除临时快照文件 */ void aofRemoveTempFile(pid_t childpid) { char tmpfile[256]; snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); + /* 后台 bio 线程关闭文件 */ bg_unlink(tmpfile); + /* 这个是子进程的临时快照文件,该文件只有子进程会进行操作,子进程写完快照后就会将该文件修改为 temp-rewriteaof-bg-%d.aof 文件 */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid); bg_unlink(tmpfile); } @@ -2532,6 +2740,7 @@ off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status) { return size; } +/* 获取 base 文件和 incr 文件的总数量 */ int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am) { int num = 0; if (am->base_aof_info) num++; @@ -2541,6 +2750,7 @@ int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am) { /* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */ +/* 子进程执行完 rewrite aof 之后的工作 */ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { char tmpfile[256]; @@ -2559,16 +2769,19 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverAssert(server.aof_manifest != NULL); /* Dup a temporary aof_manifest for subsequent modifications. */ + /* 复制一份清单信息 */ temp_am = aofManifestDup(server.aof_manifest); /* Get a new BASE file name and mark the previous (if we have) * as the HISTORY type. */ + /* 获取新的 base 文件,标记之前的 base 文件为历史文件 */ sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am); serverAssert(new_base_filename != NULL); new_base_filepath = makePath(server.aof_dirname, new_base_filename); /* Rename the temporary aof file to 'new_base_filename'. */ latencyStartMonitor(latency); + /* 重命名临时文件为新的 base 文件 */ if (rename(tmpfile, new_base_filepath) == -1) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF base file %s into %s: %s", @@ -2589,12 +2802,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Rename the temporary incr aof file to 'new_incr_filename'. */ if (server.aof_state == AOF_WAIT_REWRITE) { /* Get temporary incr aof name. */ + /* 获取临时 incr aof 文件名 */ sds temp_incr_aof_name = getTempIncrAofName(); sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name); /* Get next new incr aof name. */ + /* 获取新的 incr aof 文件名 */ sds new_incr_filename = getNewIncrAofName(temp_am); new_incr_filepath = makePath(server.aof_dirname, new_incr_filename); latencyStartMonitor(latency); + /* 将临时文件修改成新的 incr 文件 */ if (rename(temp_incr_filepath, new_incr_filepath) == -1) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF incr file %s into %s: %s", @@ -2649,6 +2865,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */ + /* 标记之前的 incr aof 文件为历史文件,并修改清单信息 */ markRewrittenIncrAofAsHistory(temp_am); /* Persist our modifications. */