Maison >base de données >tutoriel mysql >redis源代码分析19–主从复制

redis源代码分析19–主从复制

WBOY
WBOYoriginal
2016-06-07 16:26:171138parcourir

先说下Redis主从复制的特点。 官方文档ReplicationHowto中提到以下特点: 1. 一个master支持多个slave 2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构 3. 复制在master端是非阻塞的,也就是master在向client

先说下Redis主从复制的特点。

官方文档ReplicationHowto中提到以下特点:
1. 一个master支持多个slave
2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构
3. 复制在master端是非阻塞的,也就是master在向client复制时可处理其他client的命令,而slave在第一次同步时是阻塞的
4. 复制被利用来提供可扩展性,比如可以将slave端用作数据冗余,也可以将耗时的命令(比如sort)发往某些slave从而避免master的阻塞,另外也可以用slave做持久化,这只需要将master的配置文件中的save指令注释掉。

client可以在一开始时作为slave连接master,也可以在运行后发布sync命令,从而跟master建立主从关系。

接下来我们分别从slave和master的视角概述下redis的主从复制的运行机制。

如果redis作为slave运行,则全局变量server.replstate的状态有REDIS_REPL_NONE(不处于复制状态)、 REDIS_REPL_CONNECT(需要跟master建立连接)、REDIS_REPL_CONNECTED(已跟master建立连接)三种。在读入slaveof配置或者发布slaveof命令后,server.replstate取值为REDIS_REPL_CONNECT,然后在syncWithMaster跟master执行第一次同步后,取值变为REDIS_REPL_CONNECTED。

如果redis作为master运行,则对应某个客户端连接的变量slave.replstate的状态有REDIS_REPL_WAIT_BGSAVE_START(等待bgsave运行)、REDIS_REPL_WAIT_BGSAVE_END(bgsave已dump db,该bulk传输了)、REDIS_REPL_SEND_BULK(正在bulk传输)、REDIS_REPL_ONLINE(已完成开始的bulk传输,以后只需发送更新了)。对于slave客户端(发布sync命令),一开始slave.replstate都处于REDIS_REPL_WAIT_BGSAVE_START状态(后面详解syncCommand函数),然后在后台dump db后(backgroundSaveDoneHandler函数),处于REDIS_REPL_WAIT_BGSAVE_END 状态,然后updateSlavesWaitingBgsave会将状态置为REDIS_REPL_SEND_BULK,并设置write事件的函数 sendBulkToSlave,在sendBulkToSlave运行后,状态就变为REDIS_REPL_ONLINE了,此后master会一直调用replicationFeedSlaves给处于REDIS_REPL_ONLINE状态的slave发送新命令。

我们先看处于master端的redis会执行的代码。

slave端都是通过发布sync命令来跟master同步的,sync命令的处理函数syncCommand如下所示。

该函数中的注释足够明了。如果slave的client设置了REDIS_SLAVE标志,说明master已用syncCommand处理了该 slave。如果master还有对这个client的reply没有发送,则返回出错信息。此后若server.bgsavechildpid != -1且有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,则说明dump db的后台进程刚结束,此时新的slave可直接用保存的rdb进行bulk传输(注意复制reply参数,因为master是非阻塞的,此时可能执行了一些命令,call函数会调用replicationFeedSlaves函数将命令参数保存到slave的reply参数中)。如果没有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,但server.bgsavechildpid != -1,则说明bgsave后台进程没有运行完,需要等待其结束(bgsave后台进程结束后会处理等待的slave)。如果server.bgsavechildpid 等于 -1,则需要启动一个后台进程来dump db了。最后将当前client加到master的slaves链表中。

static void syncCommand(redisClient *c) {
    /* ignore SYNC if aleady slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;
    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0) {
        addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
        return;
    }
    redisLog(REDIS_NOTICE,"Slave ask for synchronization");
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.bgsavechildpid != -1) {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            listRelease(c->reply);
            c->reply = listDup(slave->reply);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}

此后slave无论处于REDIS_REPL_WAIT_BGSAVE_START还是REDIS_REPL_WAIT_BGSAVE_END,都只能等 dump db的后台进程运行结束后才会被处理。该进程结束后会执行backgroundSaveDoneHandler函数,而该函数调用 updateSlavesWaitingBgsave来处理slaves。

updateSlavesWaitingBgsave和syncCommand一样,涉及到slave的几个状态变换。对于等待dump db的slave,master都会将其放入server.slaves 链表中。此时,若slave->replstate == REDIS_REPL_WAIT_BGSAVE_START,说明当前dump db不是该slave需要的,redis需要重新启动后台进程来dump db。若slave->replstate == REDIS_REPL_WAIT_BGSAVE_END,则说明当前dump db正是该slave所需要的,此时设置slave的write事件的处理函数sendBulkToSlave。

static void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            listIter li;
            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

sendBulkToSlave 的逻辑不复杂。它根据slave->repldbfd指向的db,先从dump后的rdb文件中读入db数据,然后发送。发送完后会删除write 事件,设置slave->replstate状态为REDIS_REPL_ONLINE,此后master就会在收到命令后调用call函数,然后使用replicationFeedSlaves同步更新该slave了。replicationFeedSlaves也是遍历slave链表,对处于REDIS_REPL_ONLINE状态的slave,发送当前命令及其参数。

 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *slave = privdata;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    char buf[REDIS_IOBUF_LEN];
    ssize_t nwritten, buflen;
    if (slave->repldboff == 0) {
        /* Write the bulk write count before to transfer the DB. In theory here
         * we don't know how much room there is in the output buffer of the
         * socket, but in pratice SO_SNDLOWAT (the minimum count for output
         * operations) will never be smaller than the few bytes we need. */
        sds bulkcount;
        bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
            slave->repldbsize);
        if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
        {
            sdsfree(bulkcount);
            freeClient(slave);
            return;
        }
        sdsfree(bulkcount);
    }
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
    if (buflen repldboff += nwritten;
    if (slave->repldboff == slave->repldbsize) {
        close(slave->repldbfd);
        slave->repldbfd = -1;
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        slave->replstate = REDIS_REPL_ONLINE;
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
            sendReplyToClient, slave) == AE_ERR) {
            freeClient(slave);
            return;
        }
        addReplySds(slave,sdsempty());
        redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
    }
}

接下来我们看看redis作为slave是如何运行的。

redis 作为slave(当然也可以使用普通的client作为slave端,这样则跟具体client的实现有关了)时,需要在配置文件中指明master的位置,在loadServerConfig读取配置参数时,会将server.replstate设置为REDIS_REPL_CONNECT状态。处于此状态的redis需要运行到serverCron后才能使用syncWithMaster来和master进行初始同步。查看syncWithMaster的代码可知,其实也向master发布sync命令来建立主从关系的,另外,该函数接收、发送数据时使用的是syncRead、syncWrite函数,而这些函数是阻塞的,因此,redis作为slave运行时,建立最初的主从关系时也是阻塞的。

 /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (syncWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER  SLAVE sync succeeded");
            if (server.appendonly) rewriteAppendOnlyFileBackground();
        }
    }

另外跟主从复制有关的一个命令就是slaveof命令。此命令是redis主从状态的转换函数,通过前面的分析可知,这只需要更改几个状态即可。

static void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        if (server.masterhost) {
            sdsfree(server.masterhost);
            server.masterhost = NULL;
            if (server.master) freeClient(server.master);
            server.replstate = REDIS_REPL_NONE;
            redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
        }
    } else {
        sdsfree(server.masterhost);
        server.masterhost = sdsdup(c->argv[1]->ptr);
        server.masterport = atoi(c->argv[2]->ptr);
        if (server.master) freeClient(server.master);
        server.replstate = REDIS_REPL_CONNECT;
        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}
Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn