Heim >Datenbank >MySQL-Tutorial >Redis数据持久化机制AOF原理分析一

Redis数据持久化机制AOF原理分析一

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOriginal
2016-06-07 15:28:031266Durchsuche

本文所引用的源码全部来自Redis2.8.2版本。 Redis AOF数据持久化机制的实现相关代码是redis.c, redis.h, aof.c, bio.c, rio.c, config.c 在阅读本文之前请先阅读Redis数据持久化机制AOF原理分析之配置详解文章,了解AOF相关参数的解析,文章链接 http://blog

本文所引用的源码全部来自Redis2.8.2版本。

Redis AOF数据持久化机制的实现相关代码是redis.c, redis.h, aof.c, bio.c, rio.c, config.c

在阅读本文之前请先阅读Redis数据持久化机制AOF原理分析之配置详解文章,了解AOF相关参数的解析,文章链接

http://blog.csdn.net/acceptedxukai/article/details/18135219

转载请注明,文章出自http://blog.csdn.net/acceptedxukai/article/details/18136903

下面将介绍AOF数据持久化机制的实现

Server启动加载AOF文件数据

Server启动加载AOF文件数据的执行步骤为:main() -> initServerConfig() -> loadServerConfig() -> initServer() -> loadDataFromDisk()。initServerConfig()主要为初始化默认的AOF参数配置;loadServerConfig()加载配置文件redis.conf中AOF的参数配置,覆盖Server的默认AOF参数配置,如果配置appendonly on,那么AOF数据持久化功能将被激活,server.aof_state参数被设置为REDIS_AOF_ON;loadDataFromDisk()判断server.aof_state == REDIS_AOF_ON,结果为True就调用loadAppendOnlyFile函数加载AOF文件中的数据,加载的方法就是读取AOF文件中数据,由于AOF文件中存储的数据与客户端发送的请求格式相同完全符合Redis的通信协议,因此Server创建伪客户端fakeClient,将解析后的AOF文件数据像客户端请求一样调用各种指令,cmd->proc(fakeClient),将AOF文件中的数据重现到Redis Server数据库中。

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == REDIS_AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}
Server首先判断加载AOF文件是因为AOF文件中的数据要比RDB文件中的数据要新。
int loadAppendOnlyFile(char *filename) {
    struct redisClient *fakeClient;
    FILE *fp = fopen(filename,"r");
    struct redis_stat sb;
    int old_aof_state = server.aof_state;
    long loops = 0;

    //redis_fstat就是fstat64函数,通过fileno(fp)得到文件描述符,获取文件的状态存储于sb中,
    //具体可以参考stat函数,st_size就是文件的字节数
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        server.aof_current_size = 0;
        fclose(fp);
        return REDIS_ERR;
    }

    if (fp == NULL) {//打开文件失败
        redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     * to the same file we're about to read. */
    server.aof_state = REDIS_AOF_OFF;

    fakeClient = createFakeClient(); //建立伪终端
    startLoading(fp); // 定义于 rdb.c ,更新服务器的载入状态

    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

        /* Serve the clients from time to time */
        // 有间隔地处理外部请求,ftello()函数得到文件的当前位置,返回值为long
        if (!(loops++ % 1000)) {
            loadingProgress(ftello(fp));//保存aof文件读取的位置,ftellno(fp)获取文件当前位置
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);//处理事件
        }
        //按行读取AOF数据
        if (fgets(buf,sizeof(buf),fp) == NULL) {
            if (feof(fp))//达到文件尾EOF
                break;
            else
                goto readerr;
        }
        //读取AOF文件中的命令,依照Redis的协议处理
        if (buf[0] != '*') goto fmterr;
        argc = atoi(buf+1);//参数个数
        if (argc < 1) goto fmterr;

        argv = zmalloc(sizeof(robj*)*argc);//参数值
        for (j = 0; j < argc; j++) {
            if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
            if (buf[0] != &#39;$&#39;) goto fmterr;
            len = strtol(buf+1,NULL,10);//每个bulk的长度
            argsds = sdsnewlen(NULL,len);//新建一个空sds
            //按照bulk的长度读取
            if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
            argv[j] = createObject(REDIS_STRING,argsds);
            if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF 跳过\r\n*/
        }

        /* Command lookup */
        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
            redisLog(REDIS_WARNING,"Unknown command &#39;%s&#39; reading the append only file", (char*)argv[0]->ptr);
            exit(1);
        }
        /* Run the command in the context of a fake client */
        fakeClient->argc = argc;
        fakeClient->argv = argv;
        cmd->proc(fakeClient);//执行命令

        /* The fake client should not have a reply */
        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
        /* The fake client should never get blocked */
        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);

        /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
        for (j = 0; j < fakeClient->argc; j++)
            decrRefCount(fakeClient->argv[j]);
        zfree(fakeClient->argv);
    }

    /* This point can only be reached when EOF is reached without errors.
     * If the client is in the middle of a MULTI/EXEC, log error and quit. */
    if (fakeClient->flags & REDIS_MULTI) goto readerr;

    fclose(fp);
    freeFakeClient(fakeClient);
    server.aof_state = old_aof_state;
    stopLoading();
    aofUpdateCurrentSize(); //更新server.aof_current_size,AOF文件大小
    server.aof_rewrite_base_size = server.aof_current_size;
    return REDIS_OK;
	&hellip;&hellip;&hellip;&hellip;
}
在前面一篇关于AOF参数配置的博客遗留了一个问题,server.aof_current_size参数的初始化,下面解决这个疑问。
void aofUpdateCurrentSize(void) {
    struct redis_stat sb;

    if (redis_fstat(server.aof_fd,&sb) == -1) {
        redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s",
            strerror(errno));
    } else {
        server.aof_current_size = sb.st_size;
    }
}
redis_fstat是作者对Linux中fstat64函数的重命名,该还是就是获取文件相关的参数信息,具体可以Google之,sb.st_size就是当前AOF文件的大小。这里需要知道server.aof_fd即AOF文件描述符,该参数的初始化在initServer()函数中
/* Open the AOF file if needed. */
    if (server.aof_state == REDIS_AOF_ON) {
        server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            redisLog(REDIS_WARNING, "Can&#39;t open the append-only file: %s",strerror(errno));
            exit(1);
        }
    }
至此,Redis Server启动加载硬盘中AOF文件数据的操作就成功结束了。

Server数据库产生新数据如何持久化到硬盘

当客户端执行Set等修改数据库中字段的指令时就会造成Server数据库中数据被修改,这些修改的数据应该被实时更新到AOF文件中,并且也要按照一定的fsync机制刷新到硬盘中,保证数据不会丢失。

在上一篇博客中,提到了三种fsync方式:appendfsync always, appendfsync everysec, appendfsync no. 具体体现在server.aof_fsync参数中。

首先看当客户端请求的指令造成数据被修改,Redis是如何将修改数据的指令添加到server.aof_buf中的。

call() -> propagate() -> feedAppendOnlyFile(),call()函数判断执行指令后是否造成数据被修改。

feedAppendOnlyFile函数首先会判断Server是否开启了AOF,如果开启AOF,那么根据Redis通讯协议将修改数据的指令重现成请求的字符串,注意在超时设置的处理方式,接着将字符串append到server.aof_buf中即可。该函数最后两行代码需要注意,这才是重点,如果server.aof_child_pid != -1那么表明此时Server正在重写rewrite AOF文件,需要将被修改的数据追加到server.aof_rewrite_buf_blocks链表中,等待rewrite结束后,追加到AOF文件中。具体见下面代码的注释。

/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * flags are an xor between:
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    //将cmd指令变动的数据追加到AOF文件中
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
//cmd指令修改了数据,先将更新的数据写到server.aof_buf中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appendend. To issue a SELECT command is needed. */
    // 当前 db 不是指定的 aof db,通过创建 SELECT 命令来切换数据库
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    // 将 EXPIRE / PEXPIRE / EXPIREAT 命令翻译为 PEXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    }// 将 SETEX / PSETEX 命令翻译为 SET 和 PEXPIREAT 组合命令
    else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {//其他的指令直接追加
        /* All the other commands don&#39;t need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    // 将 buf 追加到服务器的 aof_buf 末尾,在beforeSleep中写到AOF文件中,并且根据情况fsync刷新到硬盘
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    //如果server.aof_child_pid不为1,那就说明有快照进程正在写数据到临时文件(已经开始rewrite),
    //那么必须先将这段时间接收到的指令更新的数据先暂时存储起来,等到快照进程完成任务后,
    //将这部分数据写入到AOF文件末尾,保证数据不丢失
    //解释为什么需要aof_rewrite_buf_blocks,当server在进行rewrite时即读取所有数据库中的数据,
    //有些数据已经写到新的AOF文件,但是此时客户端执行指令又将该值修改了,因此造成了差异
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    /*这里说一下server.aof_buf和server.aof_rewrite_buf_blocks的区别
      aof_buf是正常情况下aof文件打开的时候,会不断将这份数据写入到AOF文件中。
      aof_rewrite_buf_blocks 是如果用户主动触发了写AOF文件的命令时,比如 config set appendonly yes命令
      那么redis会fork创建一个后台进程,也就是当时的数据快照,然后将数据写入到一个临时文件中去。
      在此期间发送的命令,我们需要把它们记录起来,等后台进程完成AOF临时文件写后,serverCron定时任务
      感知到这个退出动作,然后就会调用backgroundRewriteDoneHandler进而调用aofRewriteBufferWrite函数,
      将aof_rewrite_buf_blocks上面的数据,也就是diff数据写入到临时AOF文件中,然后再unlink替换正常的AOF文件。
      因此可以知道,aof_buf一般情况下比aof_rewrite_buf_blocks要少,
      但开始的时候可能aof_buf包含一些后者不包含的前面部分数据。*/

    sdsfree(buf);
} 
Server在每次事件循环之前会调用一次beforeSleep函数,下面看看这个函数做了什么工作?
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    REDIS_NOTUSED(eventLoop);
    listNode *ln;
    redisClient *c;

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    /* Try to process pending commands for clients that were just unblocked. */
    while (listLength(server.unblocked_clients)) {
        ln = listFirst(server.unblocked_clients);
        redisAssert(ln != NULL);
        c = ln->value;
        listDelNode(server.unblocked_clients,ln);
        c->flags &= ~REDIS_UNBLOCKED;

        /* Process remaining data in the input buffer. */
        //处理客户端在阻塞期间接收到的客户端发送的请求
        if (c->querybuf && sdslen(c->querybuf) > 0) {
            server.current_client = c;
            processInputBuffer(c);
            server.current_client = NULL;
        }
    }

    /* Write the AOF buffer on disk */
    //将server.aof_buf中的数据追加到AOF文件中并fsync到硬盘上
    flushAppendOnlyFile(0);
}
通过上面的代码及注释可以发现,beforeSleep函数做了三件事:1、处理过期键,2、处理阻塞期间的客户端请求,3、将server.aof_buf中的数据追加到AOF文件中并fsync刷新到硬盘上,flushAppendOnlyFile函数给定了一个参数force,表示是否强制写入AOF文件,0表示非强制即支持延迟写,1表示强制写入。
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    if (sdslen(server.aof_buf) == 0) return;
    // 返回后台正在等待执行的 fsync 数量
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    // AOF 模式为每秒 fsync ,并且 force 不为 1 如果可以的话,推延冲洗
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        // 如果 aof_fsync 队列里已经有正在等待的任务
        if (sync_in_progress) {
            // 上一次没有推迟冲洗过,记录推延的当前时间,然后返回
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                // 允许在两秒之内的推延冲洗
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can&#39;t wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* If you are following this code path, then we are going to write so
     * set reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don&#39;t think
     * there is much to do about the whole server stopping for power problems
     * or alike */
    // 将 AOF 缓存写入到文件,如果一切幸运的话,写入会原子性地完成
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {//出错
        /* Ooops, we are in troubles. The best thing to do for now is
         * aborting instead of giving the illusion that everything is
         * working as expected. */
        if (nwritten == -1) {
            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
        } else {
            redisLog(REDIS_WARNING,"Exiting on short write while writing to "
                                   "the append-only file: %s (nwritten=%ld, "
                                   "expected=%ld)",
                                   strerror(errno),
                                   (long)nwritten,
                                   (long)sdslen(server.aof_buf));

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                redisLog(REDIS_WARNING, "Could not remove short write "
                         "from the append-only file.  Redis may refuse "
                         "to load the AOF the next time it starts.  "
                         "ftruncate: %s", strerror(errno));
            }
        }
        exit(1);
    }
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    // 如果 aof 缓存不是太大,那么重用它,否则,清空 aof 缓存
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* Don&#39;t fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    //aof rdb子进程运行中不支持fsync并且aof rdb子进程正在运行,那么直接返回,
    //但是数据已经写到aof文件中,只是没有刷新到硬盘
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {//总是fsync,那么直接进行fsync
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let&#39;s try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);//放到后台线程进行fsync
        server.aof_last_fsync = server.unixtime;
    }
}
上述代码中请关注server.aof_fsync参数,即设置Redis fsync AOF文件到硬盘的策略,如果设置为AOF_FSYNC_ALWAYS,那么直接在主进程中fsync,如果设置为AOF_FSYNC_EVERYSEC,那么放入后台线程中fsync,后台线程的代码在bio.c中。

小结

文章写到这,已经解决的了Redis Server启动加载AOF文件和如何将客户端请求产生的新的数据追加到AOF文件中,对于追加数据到AOF文件中,根据fsync的配置策略如何将写入到AOF文件中的新数据刷新到硬盘中,直接在主进程中fsync或是在后台线程fsync。

至此,AOF数据持久化还剩下如何rewrite AOF,接受客户端发送的BGREWRITEAOF请求,此部分内容待下篇博客中解析。

感谢此篇博客给我在理解Redis AOF数据持久化方面的巨大帮助,http://chenzhenianqing.cn/articles/786.html

本人Redis-2.8.2的源码注释已经放到Github中,有需要的读者可以下载,我也会在后续的时间中更新,https://github.com/xkeyideal/annotated-redis-2.8.2

本人不怎么会使用Git,望有人能教我一下。

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:CouchDB与CouchBase的比较Nächster Artikel:Servlet+JDBC+MySQL简单web练习