diff --git a/README.md b/README.md
index 84595825f..b26b5f9c3 100644
--- a/README.md
+++ b/README.md
@@ -49,7 +49,7 @@ redis 仓库链接:https://github.com/redis/redis
| [ae.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/ae.c) | redis 事件循环器功能 | 完成 |
| [multi.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/multi.c) | redis 事务实现 | 完成 |
| [redis-check-rdb.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/redis-check-rdb.c) | Redis RDB 检查工具 | 完成 |
-
+| [aof.c](https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/7.0-cn-annotated/src/aof.c) | redis AOF功能 | 过半 |
尚未有中文注释的文件不会出现在表格中。
更新日期:2022/10/22
diff --git a/src/aof.c b/src/aof.c
index acaffa4a6..3e2995d66 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -41,6 +41,14 @@
#include
#include
+/* redis7 的 aof 功能和之前版本的 aof 功能在文件管理方面有了很大的改变
+ * redis7 对 aof 文件进行了分类,分为以下 4 类文件(临时文件不用管)
+ * 1) manifest: 清单文件,该文件的内容对应 aofManifest 结构,记录所有其他类型文件的文件信息,并进行了分类
+ * 2) base: 基础文件,就是在执行 bgrewrite 的时候,当前数据的 RDB 快照或者 AOF,该文件的文件名后缀也是以 rdb 或者 aof 结尾,取决于 aof-use-rdb-preamble 配置项
+ * 3) incr: 增量命令文件,执行 bgrewrite 之后到的写命令,会存入该文件
+ * 4) history: 历史文件,执行 bgrewrite 之后,之前存在的 incr,base 文件就是历史文件,会通过 bio 关闭文件或者删除文件
+ */
+
void freeClientArgv(client *c);
off_t getAppendOnlyFileSize(sds filename, int *status);
off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status);
@@ -82,16 +90,27 @@ void aofManifestFreeAndUpdate(aofManifest *am);
* ------------------------------------------------------------------------- */
/* Naming rules. */
+/* AOF 各种类型的文件命名规则 */
+/* 基本文件,做 bgrewrite 的时候 redis 数据快照文件 */
#define BASE_FILE_SUFFIX ".base"
+/* 增量命令文件,做 bgrewrite 之后到达的命令写入该文件 */
#define INCR_FILE_SUFFIX ".incr"
+/* 使用 rdb 编码格式在 bgrewrite 的时候存储 base 文件的文件后缀标识 */
#define RDB_FORMAT_SUFFIX ".rdb"
+/* 不使用 rdb 编码格式在 bgrewrite 的时候存储 base 文件的文件后缀标识,和 incr 文件使用的后缀标识 */
#define AOF_FORMAT_SUFFIX ".aof"
+/* 清单文件后缀,该类型的文件会存储所有其他类型文件的文件名 */
#define MANIFEST_NAME_SUFFIX ".manifest"
+/* 临时文件前缀 */
#define TEMP_FILE_NAME_PREFIX "temp-"
/* AOF manifest key. */
+/* 下面三个宏在生成 aof 文件信息字符串的时候被使用 */
+/* 文件名 */
#define AOF_MANIFEST_KEY_FILE_NAME "file"
+/* 文件的序列号 */
#define AOF_MANIFEST_KEY_FILE_SEQ "seq"
+/* 文件的类型 */
#define AOF_MANIFEST_KEY_FILE_TYPE "type"
/* Create an empty aofInfo. */
@@ -100,13 +119,16 @@ aofInfo *aofInfoCreate(void) {
}
/* Free the aofInfo structure (pointed to by ai) and its embedded file_name. */
+/* 释放 aofInfo 实例空间 */
void aofInfoFree(aofInfo *ai) {
serverAssert(ai != NULL);
+ /* 释放其文件名空间 */
if (ai->file_name) sdsfree(ai->file_name);
zfree(ai);
}
/* Deep copy an aofInfo. */
+/* 深拷贝一份 aofInfo 实例 */
aofInfo *aofInfoDup(aofInfo *orig) {
serverAssert(orig != NULL);
aofInfo *ai = aofInfoCreate();
@@ -117,12 +139,15 @@ aofInfo *aofInfoDup(aofInfo *orig) {
}
/* Format aofInfo as a string and it will be a line in the manifest. */
+/* 将 aofInfo 实例格式化成字符串,
+ * 这个主要用于将内存中的清单信息写入文件的时候使用 */
sds aofInfoFormat(sds buf, aofInfo *ai) {
sds filename_repr = NULL;
if (sdsneedsrepr(ai->file_name))
filename_repr = sdscatrepr(sdsempty(), ai->file_name, sdslen(ai->file_name));
+ /* 格式:file xxx seq xxx type xxx */
sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c\n",
AOF_MANIFEST_KEY_FILE_NAME, filename_repr ? filename_repr : ai->file_name,
AOF_MANIFEST_KEY_FILE_SEQ, ai->file_seq,
@@ -133,6 +158,7 @@ sds aofInfoFormat(sds buf, aofInfo *ai) {
}
/* Method to free AOF list elements. */
+/* 释放 aofInfo 列表中的一个元素 */
void aofListFree(void *item) {
aofInfo *ai = (aofInfo *)item;
aofInfoFree(ai);
@@ -144,10 +170,12 @@ void *aofListDup(void *item) {
}
/* Create an empty aofManifest, which will be called in `aofLoadManifestFromDisk`. */
+/* 创建一个 aofManifest 结构 */
aofManifest *aofManifestCreate(void) {
aofManifest *am = zcalloc(sizeof(aofManifest));
am->incr_aof_list = listCreate();
am->history_aof_list = listCreate();
+ /* 给 incr aof 信息列表和历史 aof 文件列表绑定释放函数和复制函数 */
listSetFreeMethod(am->incr_aof_list, aofListFree);
listSetDupMethod(am->incr_aof_list, aofListDup);
listSetFreeMethod(am->history_aof_list, aofListFree);
@@ -156,6 +184,7 @@ aofManifest *aofManifestCreate(void) {
}
/* Free the aofManifest structure (pointed to by am) and its embedded members. */
+/* 释放 aofManifest 实例 */
void aofManifestFree(aofManifest *am) {
if (am->base_aof_info) aofInfoFree(am->base_aof_info);
if (am->incr_aof_list) listRelease(am->incr_aof_list);
@@ -163,6 +192,7 @@ void aofManifestFree(aofManifest *am) {
zfree(am);
}
+/* 获取 AOF 清单文件名 appendonly.aof.manifest */
sds getAofManifestFileName() {
return sdscatprintf(sdsempty(), "%s%s", server.aof_filename,
MANIFEST_NAME_SUFFIX);
@@ -188,6 +218,7 @@ sds getTempAofManifestFileName() {
* The base file, if exists, will always be first, followed by history files,
* and incremental files.
*/
+/* 将 aofManifest 清单格式化成字符串 */
sds getAofManifestAsString(aofManifest *am) {
serverAssert(am != NULL);
@@ -197,6 +228,7 @@ sds getAofManifestAsString(aofManifest *am) {
/* 1. Add BASE File information, it is always at the beginning
* of the manifest file. */
+ /* 格式化 base 文件信息,其实就是调用 aofInfoFormat 函数,后面两种文件信息也一样 */
if (am->base_aof_info) {
buf = aofInfoFormat(buf, am->base_aof_info);
}
@@ -228,6 +260,7 @@ sds getAofManifestAsString(aofManifest *am) {
* in order to support seamless upgrades from previous versions which did not
* use them.
*/
+/* 从磁盘中加载 aof 清单文件 */
void aofLoadManifestFromDisk(void) {
server.aof_manifest = aofManifestCreate();
if (!dirExists(server.aof_dirname)) {
@@ -235,7 +268,9 @@ void aofLoadManifestFromDisk(void) {
return;
}
+ /* 获取 aof manifest 文件名 */
sds am_name = getAofManifestFileName();
+ /* 获取 aof manifest 文件路径 */
sds am_filepath = makePath(server.aof_dirname, am_name);
if (!fileExist(am_filepath)) {
serverLog(LL_DEBUG, "The AOF manifest file %s doesn't exist", am_name);
@@ -244,7 +279,9 @@ void aofLoadManifestFromDisk(void) {
return;
}
+ /* 从磁盘中加载 aof manifest 文件,并根据其内容生成 aofManifest 结构 */
aofManifest *am = aofLoadManifestFromFile(am_filepath);
+ /* 释放之前的 aofManifest 结构实例,更改为现在的 am */
if (am) aofManifestFreeAndUpdate(am);
sdsfree(am_name);
sdsfree(am_filepath);
@@ -387,6 +424,7 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath) {
* try to modify it. Once everything is modified, we will atomically make the
* `server.aof_manifest` point to this temporary aof_manifest.
*/
+/* 复制给定的 aofManifest 实例,生成一个新的 aofManifest 实例 */
aofManifest *aofManifestDup(aofManifest *orig) {
serverAssert(orig != NULL);
aofManifest *am = zcalloc(sizeof(aofManifest));
@@ -408,6 +446,7 @@ aofManifest *aofManifestDup(aofManifest *orig) {
/* Change the `server.aof_manifest` pointer to 'am' and free the previous
* one if we have. */
+/* 释放之前旧的 aofManifest 实例,使用新的 am 设置 server.aof_manifest 属性 */
void aofManifestFreeAndUpdate(aofManifest *am) {
serverAssert(am != NULL);
if (server.aof_manifest) aofManifestFree(server.aof_manifest);
@@ -423,21 +462,28 @@ void aofManifestFreeAndUpdate(aofManifest *am) {
* appendonly.aof.1.base.aof (server.aof_use_rdb_preamble is no)
* appendonly.aof.1.base.rdb (server.aof_use_rdb_preamble is yes)
*/
+/* 获取新的 base 文件名,并标记之前的 base 文件为历史文件
+ * 新的 base 文件名会根据 server.aof_use_rdb_preamble 来进行判断,该属性默认值是 1,认为是 rdb 这种就行了 */
sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
serverAssert(am != NULL);
if (am->base_aof_info) {
serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
+ /* 标记之前的 base 文件为历史类型 */
am->base_aof_info->file_type = AOF_FILE_TYPE_HIST;
listAddNodeHead(am->history_aof_list, am->base_aof_info);
}
+ /* 选择后缀是 .aof 还是 .rdb */
char *format_suffix = server.aof_use_rdb_preamble ?
RDB_FORMAT_SUFFIX:AOF_FORMAT_SUFFIX;
aofInfo *ai = aofInfoCreate();
+ /* 设置文件名 */
ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename,
++am->curr_base_file_seq, BASE_FILE_SUFFIX, format_suffix);
+ /* 设置序列号 */
ai->file_seq = am->curr_base_file_seq;
+ /* 设置文件类型 */
ai->file_type = AOF_FILE_TYPE_BASE;
am->base_aof_info = ai;
am->dirty = 1;
@@ -451,6 +497,7 @@ sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
* for example:
* appendonly.aof.1.incr.aof
*/
+/* 获取新的 incr aof 文件信息,添加进清单中,返回新的 incr aof 文件名 */
sds getNewIncrAofName(aofManifest *am) {
aofInfo *ai = aofInfoCreate();
ai->file_type = AOF_FILE_TYPE_INCR;
@@ -463,6 +510,7 @@ sds getNewIncrAofName(aofManifest *am) {
}
/* Get temp INCR type AOF name. */
+/* 获取临时 incr 文件名 temp-xxx.incr */
sds getTempIncrAofName() {
return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename,
INCR_FILE_SUFFIX);
@@ -488,6 +536,7 @@ sds getLastIncrAofName(aofManifest *am) {
* AOF_FILE_TYPE_INCR to AOF_FILE_TYPE_HIST, and move them to the
* 'history_aof_list'.
*/
+/* 标记 incr aof 文件为历史文件 */
void markRewrittenIncrAofAsHistory(aofManifest *am) {
serverAssert(am != NULL);
if (!listLength(am->incr_aof_list)) {
@@ -497,30 +546,41 @@ void markRewrittenIncrAofAsHistory(aofManifest *am) {
listNode *ln;
listIter li;
+ /* 根据清单信息中的 incr aof 文件信息列表从链表尾部向头部生成迭代器 */
listRewindTail(am->incr_aof_list, &li);
/* "server.aof_fd != -1" means AOF enabled, then we must skip the
* last AOF, because this file is our currently writing. */
+ /* 如果之前存在 incr aof 文件 */
if (server.aof_fd != -1) {
+ /* 获取尾结点 */
ln = listNext(&li);
serverAssert(ln != NULL);
}
/* Move aofInfo from 'incr_aof_list' to 'history_aof_list'. */
+ /* 从尾结点开始,如果上一个节点不为空 */
while ((ln = listNext(&li)) != NULL) {
+ /* 获取 incr aof 文件信息 */
aofInfo *ai = (aofInfo*)ln->value;
serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
+ /* 复制一份 incr aof 信息 */
aofInfo *hai = aofInfoDup(ai);
+ /* 设置文件类型为历史文件 */
hai->file_type = AOF_FILE_TYPE_HIST;
+ /* 将新复制的 aofInfo 实例加入历史文件列表 */
listAddNodeHead(am->history_aof_list, hai);
+ /* 从 incr aof 信息列表移除当前节点 */
listDelNode(am->incr_aof_list, ln);
}
+ /* 标记清单信息修改了,需要持久化 */
am->dirty = 1;
}
/* Write the formatted manifest string to disk. */
+/* 把清单信息的格式化字符串写入清单文件中 */
int writeAofManifestFile(sds buf) {
int ret = C_OK;
ssize_t nwritten;
@@ -531,6 +591,7 @@ int writeAofManifestFile(sds buf) {
sds tmp_am_name = getTempAofManifestFileName();
sds tmp_am_filepath = makePath(server.aof_dirname, tmp_am_name);
+ /* 打开临时文件 */
int fd = open(tmp_am_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
if (fd == -1) {
serverLog(LL_WARNING, "Can't open the AOF manifest file %s: %s",
@@ -540,6 +601,7 @@ int writeAofManifestFile(sds buf) {
goto cleanup;
}
+ /* 循环将清单字符串写入临时清单文件中 */
len = sdslen(buf);
while(len) {
nwritten = write(fd, buf, len);
@@ -558,6 +620,7 @@ int writeAofManifestFile(sds buf) {
buf += nwritten;
}
+ /* 刷盘操作 */
if (redis_fsync(fd) == -1) {
serverLog(LL_WARNING, "Fail to fsync the temp AOF file %s: %s.",
tmp_am_name, strerror(errno));
@@ -566,6 +629,7 @@ int writeAofManifestFile(sds buf) {
goto cleanup;
}
+ /* 将临时文件重命名为新的清单文件 */
if (rename(tmp_am_filepath, am_filepath) != 0) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF manifest file %s into %s: %s",
@@ -594,12 +658,15 @@ int writeAofManifestFile(sds buf) {
}
/* Persist the aofManifest information pointed to by am to disk. */
+/* 将 aofManifest 实例持久化到磁盘中 */
int persistAofManifest(aofManifest *am) {
if (am->dirty == 0) {
return C_OK;
}
+ /* 将 aofManifest 清单实例格式化成字符串 */
sds amstr = getAofManifestAsString(am);
+ /* 将格式化后的字符串写入清单文件 */
int ret = writeAofManifestFile(amstr);
sdsfree(amstr);
if (ret == C_OK) am->dirty = 0;
@@ -664,6 +731,7 @@ void aofUpgradePrepare(aofManifest *am) {
* The function will traverse the 'history_aof_list' and submit
* the delete task to the bio thread.
*/
+/* 该方法是删除内存中历史文件信息,然后提交一个关闭历史文件的任务给 bio 线程 */
int aofDelHistoryFiles(void) {
if (server.aof_manifest == NULL ||
server.aof_disable_auto_gc == 1 ||
@@ -675,22 +743,28 @@ int aofDelHistoryFiles(void) {
listNode *ln;
listIter li;
+ /* 遍历清单中所有历史文件信息 */
listRewind(server.aof_manifest->history_aof_list, &li);
while ((ln = listNext(&li)) != NULL) {
aofInfo *ai = (aofInfo*)ln->value;
serverAssert(ai->file_type == AOF_FILE_TYPE_HIST);
serverLog(LL_NOTICE, "Removing the history file %s in the background", ai->file_name);
sds aof_filepath = makePath(server.aof_dirname, ai->file_name);
+ /* 提交关闭文件的任务 */
bg_unlink(aof_filepath);
sdsfree(aof_filepath);
+ /* 删除列表中的节点 */
listDelNode(server.aof_manifest->history_aof_list, ln);
}
+ /* 标记清单改变了 */
server.aof_manifest->dirty = 1;
+ /* 持久化 AOF 清单文件 */
return persistAofManifest(server.aof_manifest);
}
/* Used to clean up temp INCR AOF when AOFRW fails. */
+/* 关闭临时文件 */
void aofDelTempIncrAofFile() {
sds aof_filename = getTempIncrAofName();
sds aof_filepath = makePath(server.aof_dirname, aof_filename);
@@ -785,6 +859,7 @@ int aofFileExist(char *filename) {
* file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be
* renamed in the `backgroundRewriteDoneHandler` and written to the manifest file.
* */
+/* 打开一个新的 aof 文件 */
int openNewIncrAofForAppend(void) {
serverAssert(server.aof_manifest != NULL);
int newfd = -1;
@@ -863,12 +938,27 @@ int openNewIncrAofForAppend(void) {
* AOFRW, which may be that we have reached the 'next_rewrite_time' or the number of INCR
* AOFs has not reached the limit threshold.
* */
+/* 是否限制了执行 AOFRW
+ *
+ * 当前,如果 AOFRW 失败,redis 将会自动重试 AOFRW,如果重试后还是一直失败,将会存在大量的非常小的
+ * incr 文件,因此需要采取措施限制 AOFRW 执行
+ *
+ * 为了解决上面的问题,使用了时间限制。当 AOFRW 失败,将会延迟 1 分钟再执行下一次 AOFRW,如果之后还
+ * 失败,就延迟 2 分钟,接着是 4,8,16,.... 最大是 60 分钟
+ *
+ * 在 AOFRW 限制期间,我们仍然可以使用 bgrewriteaof 命令主动并且立即触发 AOFRW
+ *
+ * 该函数返回 1 表示 AOFRW 被限制了,不能执行,如果是 0 则可以执行 AOFRW
+ */
+/* 开始限制的阈值,即 AOFRW 连续失败 3 次,就开始进行限制 */
#define AOF_REWRITE_LIMITE_THRESHOLD 3
+/* AOFRW 限制的最大延迟时间是 60 分钟 */
#define AOF_REWRITE_LIMITE_MAX_MINUTES 60 /* 1 hour */
int aofRewriteLimited(void) {
static int next_delay_minutes = 0;
static time_t next_rewrite_time = 0;
+ /* 如果 AOFRW 连续失败次数小于 3 次,可以执行 AOFRW */
if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) {
/* We may be recovering from limited state, so reset all states. */
next_delay_minutes = 0;
@@ -878,19 +968,23 @@ int aofRewriteLimited(void) {
/* if it is in the limiting state, then check if the next_rewrite_time is reached */
if (next_rewrite_time != 0) {
+ /* 如果当前时间还没超过指定的限制时间,不能执行 AOFRW */
if (server.unixtime < next_rewrite_time) {
return 1;
} else {
+ /* 超过了,将 next_rewrite_time 置 0,并且返回 0 */
next_rewrite_time = 0;
return 0;
}
}
+ /* 随着连续失败次数的增加,需要调整限制时间,每次增长之前的两倍时间,知道达到最大时间限制 */
next_delay_minutes = (next_delay_minutes == 0) ? 1 : (next_delay_minutes * 2);
if (next_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) {
next_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES;
}
+ /* 下一次执行 AOFRW 的时间戳 */
next_rewrite_time = server.unixtime + next_delay_minutes * 60;
serverLog(LL_WARNING,
"Background AOF rewrite has repeatedly failed and triggered the limit, will retry in %d minutes", next_delay_minutes);
@@ -903,17 +997,20 @@ int aofRewriteLimited(void) {
/* Return true if an AOf fsync is currently already in progress in a
* BIO thread. */
+/* bio 线程是否有 aof fysnc 任务要执行 */
int aofFsyncInProgress(void) {
return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
+/* 使用 bio 线程异步刷盘 */
void aof_background_fsync(int fd) {
bioCreateFsyncJob(fd);
}
/* Kills an AOFRW child process if exists */
+/* 关闭 aof rewrite 子进程 */
void killAppendOnlyChild(void) {
int statloc;
/* No AOFRW child? return. */
@@ -924,6 +1021,7 @@ void killAppendOnlyChild(void) {
if (kill(server.child_pid,SIGUSR1) != -1) {
while(waitpid(-1, &statloc, 0) != server.child_pid);
}
+ /* 子进程退出需要移除掉自己产生的临时 base 文件 */
aofRemoveTempFile(server.child_pid);
resetChildState();
server.aof_rewrite_time_start = -1;
@@ -954,25 +1052,31 @@ void stopAppendOnly(void) {
/* Called when the user switches from "appendonly no" to "appendonly yes"
* at runtime using the CONFIG command. */
+/* 启动 aof 的时候,会调用该函数 */
int startAppendOnly(void) {
serverAssert(server.aof_state == AOF_OFF);
server.aof_state = AOF_WAIT_REWRITE;
if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
+ /* 如果有非 AOF 的子进程正在执行,这里不能直接执行 rewrite aof,会设置 aof_rewrite_scheduled 为 1,表示之后需要执行
+ * 一次 rewrite aof 操作 */
server.aof_rewrite_scheduled = 1;
serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
} else if (server.in_exec){
+ /* 这里是正在进行事务的情况下,也不能直接执行 rewrite aof */
server.aof_rewrite_scheduled = 1;
serverLog(LL_WARNING,"AOF was enabled during a transaction. An AOF background was scheduled to start when possible.");
} else {
/* If there is a pending AOF rewrite, we need to switch it off and
* start a new one: the old one cannot be reused because it is not
* accumulating the AOF buffer. */
+ /* 如果当前子进程类型是 aof 子进程,直接干掉当前子进程,开始一个新的 aof 子进程 */
if (server.child_type == CHILD_TYPE_AOF) {
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
killAppendOnlyChild();
}
+ /* 开始执行 bgrewrite */
if (rewriteAppendOnlyFileBackground() == C_ERR) {
server.aof_state = AOF_OFF;
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
@@ -1004,10 +1108,12 @@ int startAppendOnly(void) {
* is likely to fail. However apparently in modern systems this is no longer
* true, and in general it looks just more resilient to retry the write. If
* there is an actual error condition we'll get it at the next try. */
+/* 将 aof_buf 中的数据写到 incr 增量命令文件中 */
ssize_t aofWrite(int fd, const char *buf, size_t len) {
ssize_t nwritten = 0, totwritten = 0;
while(len) {
+ /* 调用 write 写数据到文件 */
nwritten = write(fd, buf, len);
if (nwritten < 0) {
@@ -1015,11 +1121,14 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
return totwritten ? totwritten : -1;
}
+ /* 更新 buf 剩余待写的数据的大小 */
len -= nwritten;
+ /* 更新待写数据的指针 */
buf += nwritten;
totwritten += nwritten;
}
+ /* 返回本次写入的数据量 */
return totwritten;
}
@@ -1042,6 +1151,16 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
+/* 将 aof_buf 中的数据写入磁盘中.
+ * force 参数表示要执行刷盘,redis 有三种情况下会强制刷盘
+ * 1) redis 服务关闭的时候
+ * 2) 停止 aof 的时候
+ * 3) 做 bgrewrite 的时候
+ *
+ * 这里还要注意两种 aof fsync 刷盘策略:
+ * always 表示写入磁盘之后就执行刷盘
+ * everysec 会使用 bio 大概每秒做一次异步刷盘,且该策略下存在非强制刷盘的功能
+ */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
@@ -1053,9 +1172,12 @@ void flushAppendOnlyFile(int force) {
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
+ /* 每秒刷盘一次 */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
+ /* unixtime 是 redis 的秒级时间戳 */
server.unixtime > server.aof_last_fsync &&
+ /* 如果 bio 线程的所有 aof fsync 任务都执行完了 */
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
@@ -1063,26 +1185,34 @@ void flushAppendOnlyFile(int force) {
}
}
+ /* sync_in_progress 表示是否有正在执行的 bg aof fsync 任务 */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
+ /* 如果没有强制要将 aof_buf 缓冲区中的数据写入 aof incr 文件中的情况 */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
+ /* 如果有正在执行的 bg aof fsync 任务,这里会尝试延迟 2 秒将内存数据写入文件 */
if (sync_in_progress) {
+ /* 这里每次调用完 aofWrite 函数后,会将 server.aof_flush_postponed_start 置 0
+ * 该属性为 0 表示现在开始新一次的 aof_buf 写磁盘操作 */
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
+ /* 记录当前的秒级时间戳 */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
+ /* 如果还没超过 2s 直接返回,不执行写磁盘操作 */
return;
}
/* Otherwise fall through, and go write since we can't wait
* over two seconds. */
+ /* 如果上面的逻辑都没有走,没有返回,下面就要开始执行写磁盘操作了,这里将 fsync 计数器 +1 */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
@@ -1098,6 +1228,7 @@ void flushAppendOnlyFile(int force) {
}
latencyStartMonitor(latency);
+ /* 将 aof_buf 中的数据写入文件 */
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
@@ -1115,6 +1246,7 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
+ /* 这里将 aof_flush_postponed_start 属性置 0 */
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
@@ -1175,9 +1307,11 @@ void flushAppendOnlyFile(int force) {
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
+ /* 这里是每次只同步部分 aof_buf 中的数据到磁盘的情况 */
if (nwritten > 0) {
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
+ /* 会将 aof_buf 中已经同步到磁盘的部分空间释放掉 */
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
@@ -1217,6 +1351,7 @@ void flushAppendOnlyFile(int force) {
/* Let's try to get this data on the disk. To guarantee data safe when
* the AOF fsync policy is 'always', we should exit if failed to fsync
* AOF (see comment next to the exit(1) after write error above). */
+ /* aof fsync 策略为 always 的情况下,直接刷盘 */
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
"AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
@@ -1229,6 +1364,7 @@ void flushAppendOnlyFile(int force) {
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
+ /* 这里是使用 bio 做 aof fsync 刷盘操作 */
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
@@ -1236,24 +1372,30 @@ void flushAppendOnlyFile(int force) {
}
}
+/* 将命令格式化 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
+ /* 第一个字符为 * */
buf[0] = '*';
+ /* 写入参数数量 */
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
+ /* 每个参数格式化 */
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]);
+ /* 写入参数字节大小 */
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
+ /* 写入参数字符串 */
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
@@ -1280,6 +1422,7 @@ sds genAofTimestampAnnotationIfNeeded(int force) {
return ts;
}
+/* 向 aof 缓冲区中追加一个命令 */
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
@@ -1296,17 +1439,21 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
+ /* 如果命令操作的数据库和上一次选择的数据库不同 */
if (dictid != server.aof_selected_db) {
char seldb[64];
+ /* 向缓冲区添加一条 select dictid 命令 */
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
+ /* 更新操作的数据库号 */
server.aof_selected_db = dictid;
}
/* All commands should be propagated the same way in AOF as in replication.
* No need for AOF-specific translation. */
+ /* 将命令和参数格式化 */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
/* Append to the AOF buffer. This will be flushed on disk just before
@@ -1315,6 +1462,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
if (server.aof_state == AOF_ON ||
(server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
+ /* 存入 aof_buf 中 */
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
@@ -1366,6 +1514,7 @@ int loadSingleAppendOnlyFile(char *filename) {
int ret = AOF_OK;
sds aof_filepath = makePath(server.aof_dirname, filename);
+ /* 打开 base 文件 */
FILE *fp = fopen(aof_filepath, "r");
if (fp == NULL) {
int en = errno;
@@ -1391,17 +1540,20 @@ int loadSingleAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
client *old_client = server.current_client;
+ /* 伪造一个 AOF 客户端 */
fakeClient = server.current_client = createAOFClient();
/* Check if the AOF file is in RDB format (it may be RDB encoded base AOF
* or old style RDB-preamble AOF). In that case we need to load the RDB file
* and later continue loading the AOF tail if it is an old style RDB-preamble AOF. */
char sig[5]; /* "REDIS" */
+ /* 这里判断 base 文件是 rdb 的格式存储还是 aof 格式存储 */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
/* Not in RDB format, seek back at 0 offset. */
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
/* RDB format. Pass loading the RDB functions. */
+ /* 这里是处理 RDB 格式的 base 文件的读取 */
rio rdb;
int old_style = !strcmp(filename, server.aof_filename);
if (old_style)
@@ -1410,7 +1562,9 @@ int loadSingleAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE, "Reading RDB base file on AOF loading...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
+ /* 初始化一个 rio 结构 */
rioInitWithFile(&rdb,fp);
+ /* 这里调用 rdb 加载文件的接口加载 base 文件 */
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
if (old_style)
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename);
@@ -1426,6 +1580,7 @@ int loadSingleAppendOnlyFile(char *filename) {
}
/* Read the actual AOF file, in REPL format, command by command. */
+ /* 这里是读取实际的 aof 文件 */
while(1) {
int argc, j;
unsigned long len;
@@ -1442,6 +1597,7 @@ int loadSingleAppendOnlyFile(char *filename) {
processEventsWhileBlocked();
processModuleLoadingProgressEvent(1);
}
+ /* 从 base 文件中读取一行数据 */
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp)) {
break;
@@ -1452,6 +1608,7 @@ int loadSingleAppendOnlyFile(char *filename) {
if (buf[0] == '#') continue; /* Skip annotations */
if (buf[0] != '*') goto fmterr;
if (buf[1] == '\0') goto readerr;
+ /* 读取参数数量 */
argc = atoi(buf+1);
if (argc < 1) goto fmterr;
if ((size_t)argc > SIZE_MAX / sizeof(robj*)) goto fmterr;
@@ -1463,6 +1620,7 @@ int loadSingleAppendOnlyFile(char *filename) {
fakeClient->argv = argv;
fakeClient->argv_len = argc;
+ /* 根据参数数量从 aof 文件中读取参数 */
for (j = 0; j < argc; j++) {
/* Parse the argument len. */
char *readres = fgets(buf,sizeof(buf),fp);
@@ -1474,9 +1632,11 @@ int loadSingleAppendOnlyFile(char *filename) {
else
goto fmterr;
}
+ /* 获取当前参数的字节数 */
len = strtol(buf+1,NULL,10);
/* Read it into a string object. */
+ /* 读取出参数 */
argsds = sdsnewlen(SDS_NOINIT,len);
if (len && fread(argsds,len,1,fp) == 0) {
sdsfree(argsds);
@@ -1484,9 +1644,11 @@ int loadSingleAppendOnlyFile(char *filename) {
freeClientArgv(fakeClient);
goto readerr;
}
+ /* 将参数封装成 redis 对象 */
argv[j] = createObject(OBJ_STRING,argsds);
/* Discard CRLF. */
+ /* 丢弃换行符 */
if (fread(buf,2,1,fp) == 0) {
fakeClient->argc = j+1; /* Free up to j. */
freeClientArgv(fakeClient);
@@ -1495,6 +1657,7 @@ int loadSingleAppendOnlyFile(char *filename) {
}
/* Command lookup */
+ /* 根据参数和参数数量从命令表中获取对应的命令 */
cmd = lookupCommand(argv,argc);
if (!cmd) {
serverLog(LL_WARNING,
@@ -1509,6 +1672,7 @@ int loadSingleAppendOnlyFile(char *filename) {
/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;
+ /* 如果不是 exec 命令,就将命令入队到事务队列 */
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
@@ -1517,6 +1681,7 @@ int loadSingleAppendOnlyFile(char *filename) {
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);
} else {
+ /* 是 exec 命令,调用命令的处理函数 */
cmd->proc(fakeClient);
}
@@ -1654,6 +1819,7 @@ int loadAppendOnlyFiles(aofManifest *am) {
startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);
/* Load BASE AOF if needed. */
+ /* 加载 base 文件 */
if (am->base_aof_info) {
serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
aof_name = (char*)am->base_aof_info->file_name;
@@ -2326,6 +2492,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
+/* 子进程重写 aof 文件 */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp = NULL;
@@ -2333,6 +2500,7 @@ int rewriteAppendOnlyFile(char *filename) {
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
+ /* 打开一个临时文件 */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
@@ -2340,24 +2508,33 @@ int rewriteAppendOnlyFile(char *filename) {
return C_ERR;
}
+ /* 使用 FILE 类型的 RIO 来将数据写入临时文件 */
+
+ /* 初始化一个 file 类型的 rio */
rioInitWithFile(&aof,fp);
+ /* 如果配置了 aof 增量刷盘 */
if (server.aof_rewrite_incremental_fsync)
+ /* 设置 rio 的 autoSync (自动刷盘的阈值) */
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
+ /* 使用 rdb 编码格式生成 base 文件内容,redis 默认使用这种方式 */
if (server.aof_use_rdb_preamble) {
int error;
+ /* 使用 rdb 编码格式保存 redis 的数据快照 */
if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
+ /* 使用 aof 自己的格式生成 base 文件内容 */
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Make sure data will not remain on the OS's output buffers */
+ /* page cache 刷盘 */
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (fclose(fp)) { fp = NULL; goto werr; }
@@ -2365,6 +2542,7 @@ int rewriteAppendOnlyFile(char *filename) {
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
+ /* 重命名文件 */
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
@@ -2400,9 +2578,26 @@ int rewriteAppendOnlyFile(char *filename) {
* 4d) persist AOF manifest file
* 4e) Delete the history files use bio
*/
+/* 后台重写 aof 文件的步骤
+ * 1) 用户调用 BGREWRITEAOF 命令
+ * 2) redis 调用当前函数,fork 子进程
+ * 2a) 子进程使用临时文件重写 aof
+ * 2b) 父进程打开一个新的 incr 类型的 aof 文件继续写后续到来的写命令
+ * 3) 当子进程完成 2a 步骤了
+ * 4) 父进程会判断子进程的退出码,如果是 OK
+ * 4a) 获取一个新的 base 文件名,标记之前的 base 文件为历史文件
+ * 4b) 用新的 base 文件名重命名之前子进程的临时文件
+ * 4c) 标记之前的 incr aof 文件为历史文件
+ * 4d) 持久化 aof 清单文件
+ * 4e) 使用 bio 线程删除历史文件
+ *
+ * 该函数其实并没有做 4) 的逻辑,4) 的逻辑是在 backgroundRewriteDoneHandler 函数中实现的
+ * 看完该函数,可以直接跳到 backgroundRewriteDoneHandler 函数看剩余逻辑
+ */
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
+ /* 如果有正在运行的子进程,返回 */
if (hasActiveChildProcess()) return C_ERR;
if (dirCreateIfMissing(server.aof_dirname) == -1) {
@@ -2414,21 +2609,31 @@ int rewriteAppendOnlyFileBackground(void) {
/* We set aof_selected_db to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command. */
+ /* 将 aof_selected_db 设置为 -1 */
server.aof_selected_db = -1;
+ /* 将 aof 缓冲区中的数据写入磁盘文件 */
flushAppendOnlyFile(1);
+ /* 打开一个新的 incr aof 文件 */
if (openNewIncrAofForAppend() != C_OK) {
server.aof_lastbgrewrite_status = C_ERR;
return C_ERR;
}
server.stat_aof_rewrites++;
+ /* 调用 fork 系统调用生成子进程 */
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
char tmpfile[256];
+ /* 子进程部分的逻辑 */
/* Child */
+ /* 设置进程名 */
redisSetProcTitle("redis-aof-rewrite");
+ /* 给子进程绑定 cpu 核 */
redisSetCpuAffinity(server.aof_rewrite_cpulist);
+ /* 获取临时文件名 */
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
+ /* 使用临时文件写 redis 数据快照 */
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
+ /* rewrite 成功 */
serverLog(LL_NOTICE,
"Successfully created the temporary AOF base file %s", tmpfile);
sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
@@ -2471,12 +2676,15 @@ void bgrewriteaofCommand(client *c) {
}
}
+/* 移除临时快照文件 */
void aofRemoveTempFile(pid_t childpid) {
char tmpfile[256];
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
+ /* 后台 bio 线程关闭文件 */
bg_unlink(tmpfile);
+ /* 这个是子进程的临时快照文件,该文件只有子进程会进行操作,子进程写完快照后就会将该文件修改为 temp-rewriteaof-bg-%d.aof 文件 */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid);
bg_unlink(tmpfile);
}
@@ -2532,6 +2740,7 @@ off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status) {
return size;
}
+/* 获取 base 文件和 incr 文件的总数量 */
int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am) {
int num = 0;
if (am->base_aof_info) num++;
@@ -2541,6 +2750,7 @@ int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am) {
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
+/* 子进程执行完 rewrite aof 之后的工作 */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
char tmpfile[256];
@@ -2559,16 +2769,19 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverAssert(server.aof_manifest != NULL);
/* Dup a temporary aof_manifest for subsequent modifications. */
+ /* 复制一份清单信息 */
temp_am = aofManifestDup(server.aof_manifest);
/* Get a new BASE file name and mark the previous (if we have)
* as the HISTORY type. */
+ /* 获取新的 base 文件,标记之前的 base 文件为历史文件 */
sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
serverAssert(new_base_filename != NULL);
new_base_filepath = makePath(server.aof_dirname, new_base_filename);
/* Rename the temporary aof file to 'new_base_filename'. */
latencyStartMonitor(latency);
+ /* 重命名临时文件为新的 base 文件 */
if (rename(tmpfile, new_base_filepath) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF base file %s into %s: %s",
@@ -2589,12 +2802,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Rename the temporary incr aof file to 'new_incr_filename'. */
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Get temporary incr aof name. */
+ /* 获取临时 incr aof 文件名 */
sds temp_incr_aof_name = getTempIncrAofName();
sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
/* Get next new incr aof name. */
+ /* 获取新的 incr aof 文件名 */
sds new_incr_filename = getNewIncrAofName(temp_am);
new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
latencyStartMonitor(latency);
+ /* 将临时文件修改成新的 incr 文件 */
if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF incr file %s into %s: %s",
@@ -2649,6 +2865,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
+ /* 标记之前的 incr aof 文件为历史文件,并修改清单信息 */
markRewrittenIncrAofAsHistory(temp_am);
/* Persist our modifications. */