写点什么

🚄【Redis 干货领域】从底层彻底吃透 AOF 重写 (源码篇)

发布于: 2021 年 05 月 19 日
🚄【Redis 干货领域】从底层彻底吃透 AOF 重写 (源码篇)

🚄 前提概要

随着 redis 的运行,AOF 会不断膨胀(对于一个 key 会有多条 AOF 日志),导致通过 aof 恢复数据时,耗费大量不必要的时间。redis 提供的解决方案是 AOF Rewrite

根据 DB 的内容,对于每个 key,生成一条日志,AOF 触发的时机。

  • 1)用户调用 bgrewriteaof 命令

  • 2)AOF 日志大小超过预设的配置的阈值。

🚄 AOF Rewrite 触发时机

首先看一下,bgrewriteaof 的处理函数:

void bgrewriteaofCommand(redisClient *c) {    if (server.aof_child_pid != -1) {        addReplyError(c,"Background append only file rewriting already in progress");    } else if (server.rdb_child_pid != -1) {        server.aof_rewrite_scheduled = 1;        addReplyStatus(c,"Background append only file rewriting scheduled");    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {        addReplyStatus(c,"Background append only file rewriting started");    } else {        addReply(c,shared.err);    }}
复制代码
  • aof_child_pid:是指的进行 aof rewrite 进程的 pid(都是后台进程)

  • rdb_child_pid:是指的进行 rdb dump 的进程 pid。(都是后台进程)

  1. 如果当前正在进行 aof rewrite,则返回客户端错误。

  2. 如果当前正在进行 rdb dump,为了避免对磁盘造成压力,将 aof_rewrite_scheduled 置为 1,随后在没有进行 aof rewrite rdb dump 时,再开启 rewrite

  3. 如果当前没有 aof rewrite rdb dump 在进行,则调用 rewriteAppendOnlyFileBackground 进行 aof rewrite

  4. 异常情况,直接返回错误。

下面,看一下 serverCron 中是如何触发 aof rewrite 的。

第一个触发点是,避免与 rdb dump 冲突,延迟触发 rewrite。

/* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&    server.aof_rewrite_scheduled){    rewriteAppendOnlyFileBackground();}
复制代码

需要确认当前没有 aof rewrite 和 rdb dump 在进行(-1),并且设置了 aof_rewrite_scheduled,调用 rewirteAppendOnlyFileBackground 进行 aof rewrite。


第二个触发位置是 aof 文件的大小超过预定的百分比。

/* Trigger an AOF rewrite if needed */if (server.rdb_child_pid == -1 &&    server.aof_child_pid == -1 &&    server.aof_rewrite_perc &&    server.aof_current_size > server.aof_rewrite_min_size){    long long base = server.aof_rewrite_base_size ?                        server.aof_rewrite_base_size : 1;    long long growth = (server.aof_current_size*100/base) - 100;    if (growth >= server.aof_rewrite_perc) {       redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);       rewriteAppendOnlyFileBackground();    }}
复制代码

当 aof 文件超过了预定的最小值,并且超过了上一次 aof 文件的一定百分比,则会触发 aof rewrite

🚄 AOF Rewrite 核心流程

rewrite 的大致流程是:

  1. 创建子进程,获取当前快照,同时将之后的命令记录到 aof_rewrite_buf_block 中,

  2. 子进程遍历 db 生成 aof 临时文件,然后退出;

  3. 子进程完成 aof 写入之后,通过管道技术或者信号量技术通知父进程。

  4. 之后将 aof_rewrite_buf_block 中的数据追加到该 aof 文件中。

  5. 最后重命名该临时文件为正式的 aof 文件。

下面看具体代码,首先是 rewriteAppendOnlyFileBackground

pid_t childpid;long long start;
// <MM>// 避免同时多个进程进行rewrite// </MM>if (server.aof_child_pid != -1) return REDIS_ERR;
复制代码

如果有其他 aof rewrite 进程正在进行,直接返回错误。

start = ustime();if ((childpid = fork()) == 0) {  char tmpfile[256];	/* Child */    // <MM>    // 子进程不能接受连接    // </MM>    closeListeningSockets(0);    redisSetProcTitle("redis-aof-rewrite");    // <MM>    // 生成临时aof文件名    // </MM>    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());    if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {        size_t private_dirty = zmalloc_get_private_dirty();        if (private_dirty) {            redisLog(REDIS_NOTICE,                "AOF rewrite: %zu MB of memory used by copy-on-write",                private_dirty/(1024*1024));        }        exitFromChild(0);    } else {        exitFromChild(1);}
复制代码
  1. 去当前时间,用于统计 fork 耗时。

  2. 然后调用 fork,进入子进程的流程。子进程首先关闭监听 socket,避免接收客户端连接

  3. 同时设置进程的 title。然后,生成 rewrite 要写入的临时文件名。

  4. 接下来调用 rewriteAppendOnlyFile 进行 rewrite。

  5. 如果 rewrite 成功,统计 copy-on-write 的脏页并记录日志,然后以退出码 0 退出进程。

  6. 如果 rewrite 失败,则退出进程并返回 1 作为退出码。

下面看一下父进程的流程:

} else {    /* Parent */    server.stat_fork_time = ustime()-start;    server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */    latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);    if (childpid == -1) {        redisLog(REDIS_WARNING,            "Can't rewrite append only file in background: fork: %s",            strerror(errno));        return REDIS_ERR;    }    redisLog(REDIS_NOTICE,        "Background append only file rewriting started by pid %d",childpid);    server.aof_rewrite_scheduled = 0;    server.aof_rewrite_time_start = time(NULL);    server.aof_child_pid = childpid;    updateDictResizePolicy();    /* We set appendseldb to -1 in order to force the next call to the     * feedAppendOnlyFile() to issue a SELECT command, so the differences     * accumulated by the parent into server.aof_rewrite_buf will start     * with a SELECT statement and it will be safe to merge. */    server.aof_selected_db = -1;    replicationScriptCacheFlush();    return REDIS_OK;}
复制代码

父进程首先统计 fork 耗时并采样。

  • 如果 fork 失败,记录日志并返回错误。

  • 如果 fork 成功,对 aof_rewrite_scheduled 清零,记录 rewrite 开始时间以及 aof_child_pidredis 通过这个属性判断是否有 aof rewrite 在进行)。

调用 updateDictResizePolicy 调整 db 的 key space 的 rehash 策略,由于创建了子进程,避免 copy-on-write 复制大量内存页,这里会禁止 dict 的 rehash。

将 aof_selected_db 置为-1,目的是,下一条 aof 会首先生成一条 select db 的日志,同时会写到 aof_rewrite_buf 中,这样就可以将 aof_rewrite_buf 正常的追加到 rewrite 之后的文件。replicationScriptCacheFlush 暂时没看到这。

下面看一下子进程进行 aof rewrite 的过程,进入 rewriteAppendOnlyFile 函数。大体上,就是遍历所有 key,进行序列化,然后记录到 aof 文件中。

dictIterator *di = NULL;dictEntry *de;rio aof;FILE *fp;char tmpfile[256];
int j;long long now = mstime();
/* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR;}
复制代码

获取当前时间,生成临时文件名并创建该文件。

rioInitWithFile(&aof,fp);if (server.aof_rewrite_incremental_fsync)   rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
复制代码

rio 就是面向流的 I/O 接口,底层可以有不同实现,目前提供了文件和内存 buffer 的实现。

这里对 rio 进行初始化。如果配置了 server.aof_rewrite_incremental_fsync,则在写 aof 时会增量地进行 fsync,这里配置的是每写入 32M 就 sync 一次。避免集中 sync 导致磁盘跑满。接下来是一个循环,用于遍历 redis 的每个 db,对其进行 rewirte。直接看循环内部:

    char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";    redisDb *db = server.db+j;    dict *d = db->dict;    if (dictSize(d) == 0) continue;    di = dictGetSafeIterator(d);    if (!di) {        fclose(fp);        return REDIS_ERR;    }    /* SELECT the new DB */    if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;    if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
复制代码

首先,生成对应 db 的 select 命令,然后查看如果 db 为空的话,就跳过,rewrite 下一个 db。然后获取该 db 的迭代器,如果获取失败,直接返回错误。最后将 select db 的命令写入文件。接下来还是一个循环,用于遍历 db 的每一个 key,生成相应的命令。

while ((de = dictNext(di)) != NULL) {    // ...}dictReleaseIterator(di);
复制代码

继续看循环内部:

        sds keystr;        robj key, *o;        long long expiretime;
keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
/* If this key is already expired skip it */ if (expiretime != -1 && expiretime < now) continue;
复制代码

de 是 dict 的一个 entry,包含了 key 和 value。这里,首先获取 key 和 value,并将 key 转换成 robj 类型。然后,获取 key 对应的超时时间。如果已经超时,则跳过这个 key。

            /* Save the key and associated value */            if (o->type == REDIS_STRING) {                /* Emit a SET command */                char cmd[]="*3\r\n$3\r\nSET\r\n";                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;                /* Key and value */                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;                if (rioWriteBulkObject(&aof,o) == 0) goto werr;            } else if (o->type == REDIS_LIST) {                if (rewriteListObject(&aof,&key,o) == 0) goto werr;            } else if (o->type == REDIS_SET) {                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;            } else if (o->type == REDIS_ZSET) {                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;            } else if (o->type == REDIS_HASH) {                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;            } else {                redisPanic("Unknown object type");            }
复制代码

接下来,根据对象的类型,序列化成相应的命令。并将命令写入 aof 文件中。具体各个对象的序列化,这里不再详述。

/* Make sure data will not remain on the OS's output buffers */if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR;}redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");return REDIS_OK;
复制代码

调用 fflush,fsync 将数据落地到磁盘,最后 close 文件。将临时文件重命名,确保生成的 aof 文件完全 ok,避免出现 aof 不完整的情况。最后,打印日志并返回。

werr:    fclose(fp);    unlink(tmpfile);    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));    if (di)       dictReleaseIterator(di);    return REDIS_ERR;
复制代码


在打开文件后,任何一个步出错,都会跳到 werr,进行错误处理。这里,需要将文件 close,删除临时文件,如果 dict 的迭代器没有释放的话,需要进行释放。最后,返回 error。

到这,子进程的 aof rewrite 任务就完成了,现在 rewrite 后的文件已经生成,但是在 rewrite 过程中得日志并没有记录到 aof 文件,所以还需部分收尾工作,这是在主进程中完成的。

🚄AOF Rewrite Buffer 追加

多进程编程中,子进程退出后,父进程需要对其进行清理,否则子进程会编程僵尸进程。同样是在 serverCron 函数中,主进程完成对 rewrite 进程的清理。

    redisLog(REDIS_NOTICE,        "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
复制代码


        /* Check if a background saving or AOF rewrite in progress terminated. */    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {        int statloc;        pid_t pid;         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {            int exitcode = WEXITSTATUS(statloc);            int bysignal = 0;             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);             if (pid == server.rdb_child_pid) {                backgroundSaveDoneHandler(exitcode,bysignal);            } else if (pid == server.aof_child_pid) {                backgroundRewriteDoneHandler(exitcode,bysignal);            } else {                redisLog(REDIS_WARNING,                    "Warning, detected child with unmatched pid: %ld",                    (long)pid);            }            updateDictResizePolicy();        }    } else {
复制代码

如果正在进程 rdb dump 或者 aof rewrite,主进程会非阻塞的调用 wait3 函数,以便在子进程退出后,获取其退出状态。如果退出的进程是 aof rewrite 进程的话,会调用 backgroundRewriteDoneHandler 函数进行最后的收尾工作。下面看一下这个函数。

如果正常退出的情况下,就是没有被信号 kill,并且退出码等于 0。

        int newfd, oldfd;        char tmpfile[256];        long long now = ustime();        mstime_t latency;         redisLog(REDIS_NOTICE,            "Background AOF rewrite terminated with success");         /* Flush the differences accumulated by the parent to the         * rewritten AOF. */        latencyStartMonitor(latency);        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",            (int)server.aof_child_pid);        newfd = open(tmpfile,O_WRONLY|O_APPEND);        if (newfd == -1) {            redisLog(REDIS_WARNING,                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));            goto cleanup;        }
复制代码

首先是记录日志,然后打开临时写入的 rewrite 文件。

    // <MM>    // 将rewrite buf追加到文件    // </MM>    if (aofRewriteBufferWrite(newfd) == -1) {        redisLog(REDIS_WARNING,            "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));        close(newfd);        goto cleanup;    }    latencyEndMonitor(latency);    latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
redisLog(REDIS_NOTICE, "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
复制代码

接下来,将 aof rewrite buffer 追加到文件。

        /* The only remaining thing to do is to rename the temporary file to         * the configured file and switch the file descriptor used to do AOF         * writes. We don't want close(2) or rename(2) calls to block the         * server on old file deletion.         *         * There are two possible scenarios:         *         * 1) AOF is DISABLED and this was a one time rewrite. The temporary         * file will be renamed to the configured file. When this file already         * exists, it will be unlinked, which may block the server.         *         * 2) AOF is ENABLED and the rewritten AOF will immediately start         * receiving writes. After the temporary file is renamed to the         * configured file, the original AOF file descriptor will be closed.         * Since this will be the last reference to that file, closing it         * causes the underlying file to be unlinked, which may block the         * server.         *         * To mitigate the blocking effect of the unlink operation (either         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we         * use a background thread to take care of this. First, we         * make scenario 1 identical to scenario 2 by opening the target file         * when it exists. The unlink operation after the rename(2) will then         * be executed upon calling close(2) for its descriptor. Everything to         * guarantee atomicity for this switch has already happened by then, so         * we don't care what the outcome or duration of that close operation         * is, as long as the file descriptor is released again. */        if (server.aof_fd == -1) {            // <MM>            // 没有开启AOF,由命令触发的aof rewrite            // </MM>            /* AOF disabled */              /* Don't care if this fails: oldfd will be -1 and we handle that.              * One notable case of -1 return is if the old file does              * not exist. */             oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);        } else {            /* AOF enabled */            oldfd = -1; /* We'll set this to the current AOF filedes later. */        }         /* Rename the temporary file. This will not unlink the target file if         * it exists, because we reference it with "oldfd". */        latencyStartMonitor(latency);        if (rename(tmpfile,server.aof_filename) == -1) {            redisLog(REDIS_WARNING,                "Error trying to rename the temporary AOF file: %s", strerror(errno));            close(newfd);            if (oldfd != -1) close(oldfd);            goto cleanup;        }        latencyEndMonitor(latency);        latencyAddSampleIfNeeded("aof-rename",latency);         if (server.aof_fd == -1) {            /* AOF disabled, we don't need to set the AOF file descriptor             * to this new file, so we can close it. */            close(newfd);        } else {            /* AOF enabled, replace the old fd with the new one. */            oldfd = server.aof_fd;            server.aof_fd = newfd;            if (server.aof_fsync == AOF_FSYNC_ALWAYS)                aof_fsync(newfd);            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)                aof_background_fsync(newfd);            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */            aofUpdateCurrentSize();            server.aof_rewrite_base_size = server.aof_current_size;             /* Clear regular AOF buffer since its contents was just written to             * the new AOF from the background rewrite buffer. */            sdsfree(server.aof_buf);            server.aof_buf = sdsempty();        }
复制代码

然后,将临时文件重命名为最终的 aof 文件。


server.aof_lastbgrewrite_status = REDIS_OK; redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully"); /* Change state from WAIT_REWRITE to ON if needed */ if (server.aof_state == REDIS_AOF_WAIT_REWRITE) server.aof_state = REDIS_AOF_ON; /* Asynchronously close the overwritten AOF. */ if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); redisLog(REDIS_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now);
复制代码

最后,更新状态,异步关闭之前的 aof 文件。如果 rewrite 子进程异常退出,由信号 kill 或者退出码非 0,则只是记录 日志。

} else if (!bysignal && exitcode != 0) {    server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING, "Background AOF rewrite terminated with error");} else { server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING, "Background AOF rewrite terminated by signal %d", bysignal);}
复制代码

在追加 rewrite buffer 或者重命名文件失败时,需要进行清理工作,有 cleanup 分支处理:

cleanup:    aofRewriteBufferReset();    aofRemoveTempFile(server.aof_child_pid);    server.aof_child_pid = -1;    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;    server.aof_rewrite_time_start = -1;    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */    if (server.aof_state == REDIS_AOF_WAIT_REWRITE)        server.aof_rewrite_scheduled = 1;
复制代码


用户头像

我们始于迷惘,终于更高水平的迷惘。 2020.03.25 加入

🏆 【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 🤝未来我们希望可以共同进步🤝

评论 (1 条评论)

发布
用户头像
对应相关的【原理篇】照应篇
2021 年 05 月 19 日 09:54
回复
没有更多了
🚄【Redis 干货领域】从底层彻底吃透 AOF 重写 (源码篇)