Heim >Datenbank >MySQL-Tutorial >Redis源码整体运行流程详解

Redis源码整体运行流程详解

WBOY
WBOYOriginal
2016-06-07 15:23:101385Durchsuche

Redis Server端处理Client请求的流程图 main函数 main函数主要的功能为:调用initServerConfig函数,进行默认的redisServer数据结构的参数初始化;调用daemonize函数,为服务器开始守护进程,对于守护进行相关详细信息见http://blog.csdn.net/acceptedxukai/

Redis Server端处理Client请求的流程图

\

 

main函数

main函数主要的功能为:调用initServerConfig函数,进行默认的redisServer数据结构的参数初始化;调用daemonize函数,为服务器开始守护进程,对于守护进行相关详细信息见http://blog.csdn.net/acceptedxukai/article/details/8743189;调用initServer函数,初始化服务器;调用loadServerConfig函数,读取Redis的配置文件,使用配置文件中的参数替换默认的参数值;调用aeMain函数,开启事件循环,整个服务器开始工作。

initServer函数

该函数主要为初始化服务器,需要初始化的内容比较多,主要有:

1、创建事件循环

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
2、创建TCP与UDP Server,启动服务器,完成bind与listen
/* Open the TCP listening socket for the user commands. */
    //server.ipfd是个int数组,启动服务器,完成bind,listen
    if (listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);
    /* Open the listening Unix domain socket. */
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
        if (server.sofd == ANET_ERR) {
            redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
            exit(1);
        }
    }
Redis2.8.2 TCP同时支持IPv4与IPv6,同时与之前版本的Redis不同,此版本支持多个TCP服务器,listenToPort函数主要还是调用anetTcpServer函数,完成socket()-->bind()-->listen(),下面详细查看下TCPServer的创建,UDP直接忽略吧,我也不知道UDP具体用在哪。
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
    //绑定bind
    if (bind(s,sa,len) == -1) {
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    /* Use a backlog of 512 entries. We pass 511 to the listen() call because
     * the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
     * which will thus give us a backlog of 512 entries */
    //监听
    if (listen(s, 511) == -1) {
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af)
{
    int s, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    //套接字地址用于监听绑定
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */
    //可以加上hints.ai_protocol = IPPROTO_TCP;

    /**getaddrinfo(const char *hostname, const char *servicename,
                   const struct addrinfo *hint,struct addrinfo **res);
       hostname:主机名
       servicename: 服务名
       hint: 用于过滤的模板,仅能使用ai_family, ai_flags, ai_protocol, ai_socktype,其余字段为0
       res:得到所有可用的地址
    */
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    //轮流尝试多个地址,找到一个允许连接到服务器的地址时便停止
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        //设置套接字选项setsockopt,采用地址复用
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        //bind, listen
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen) == ANET_ERR) goto error;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket");
        goto error;
    }

error:
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}
//if server.ipfd_count = 0, bindaddr = NULL
int anetTcpServer(char *err, int port, char *bindaddr)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET);
}
3、将listen的端口加入到事件监听中,进行监听,由aeCreateFileEvent函数完成,其注册的listen端口可读事件处理函数为acceptTcpHandler,这样在listen端口有新连接的时候会调用acceptTcpHandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。
/* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
     //文件事件,用于处理响应外界的操作请求,事件处理函数为acceptTcpHandler/acceptUnixHandler
     //在networking.c
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

acceptTcpHandler函数

上面介绍了,initServer完成listen端口后,会加入到事件循环中,该事件为可读事件,并记录处理函数为fe->rfileProc = acceptTcpHandler;该函数分两步操作:用acceptTcpHandler接受这个客户端连接;然第二部初始化这个客户端连接的相关数据,将clientfd加入事件里面,设置的可读事件处理函数为readQueryFromClient,也就是读取客户端请求的函数。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);//无意义
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    //cfd为accept函数返回的客户端文件描述符,accept使服务器完成一个客户端的链接
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
    }
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
	//将cfd加入事件循环并设置回调函数为readQueryFromClient,并初始化redisClient
    acceptCommonHandler(cfd,0);
}

第一步很简单即完成accept,主要关注第二步acceptCommonHandler函数

static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {//创建新的客户端
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in non-blocking
     * mode and we can send an error for free using the Kernel I/O */
    //当前连接的客户端数目大于服务器最大运行的连接数,则拒绝连接
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That&#39;s a best effort error message, don&#39;t check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
    c->flags |= flags;
}

createClient函数

此函数用来为新连接的客户端初始化一个redisClient数据结构,该数据结构有比较多的参数,详见redis.h。该函数完成两个操作,第一、为客户端创建事件处理函数readQueryFromClient专门接收客户端发来的指令,第二、初始化redisClient数据结构相关参数。
redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the Redis commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
     /**
        因为 Redis 命令总在客户端的上下文中执行,
        有时候为了在服务器内部执行命令,需要使用伪客户端来执行命令
        在 fd == -1 时,创建的客户端为伪终端
     */
    if (fd != -1) {
        //下面三个都是设置socket属性
        anetNonBlock(NULL,fd);//非阻塞
        anetEnableTcpNoDelay(NULL,fd);//no delay
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);//keep alive

        //创建一个accept fd的FileEvent事件,事件的处理函数是readQueryFromClient
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);//默认选择第0个db, db.c
    c->fd = fd;//文件描述符
    c->name = NULL;
    c->bufpos = 0;//将指令结果发送给客户端的字符串长度
    c->querybuf = sdsempty();//请求字符串初始化
    c->querybuf_peak = 0;//请求字符串顶峰时的长度值
    c->reqtype = 0;//请求类型
    c->argc = 0;//参数个数
    c->argv = NULL;//参数内容
    c->cmd = c->lastcmd = NULL;//操作指令
    c->multibulklen = 0;//块个数
    c->bulklen = -1;//每个块的长度
    c->sentlen = 0;
    c->flags = 0;//客户类型的标记,比较重要
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REDIS_REPL_NONE;
    c->reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->reply = listCreate();//存放服务器应答的数据
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,decrRefCountVoid);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->bpop.keys = dictCreate(&setDictType,NULL);//下面三个参数在list数据阻塞操作时使用
    c->bpop.timeout = 0;
    c->bpop.target = NULL;
    c->io_keys = listCreate();
    c->watched_keys = listCreate();//事务命令CAS中使用
    listSetFreeMethod(c->io_keys,decrRefCountVoid);
    c->pubsub_channels = dictCreate(&setDictType,NULL);
    c->pubsub_patterns = listCreate();
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    // 如果不是伪客户端,那么将客户端加入到服务器客户端列表中
    if (fd != -1) listAddNodeTail(server.clients,c);//添加到server的clients链表
    initClientMultiState(c);//初始化事务指令状态
    return c;
}
客户端的请求指令字符串始终存放在querybuf中,在对querybuf解析后,将指令参数的个数存放在argc中,具体的指令参数存放在argv中;但是服务器应答的结果有两种存储方式:buf字符串、reply列表。

readQueryFromClient函数

readQueryFromClient函数用来读取客户端的请求命令行数据,并调用processInputBuffer函数依照redis通讯协议对数据进行解析。服务器使用最原始的read函数来读取客户端发送来的请求命令,并将字符串存储在querybuf中,根据需要对querybuf进行扩展。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    server.current_client = c;
    readlen = REDIS_IOBUF_LEN; //1024 * 16
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= REDIS_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    //对querybuf的空间进行扩展
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //读取客户端发来的操作指令
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    if (nread) {
        //改变querybuf的实际长度和空闲长度,len += nread, free -= nread;
        sdsIncrLen(c->querybuf,nread);
        c->lastinteraction = server.unixtime;
        if (c->flags & REDIS_MASTER) c->reploff += nread;
    } else {
        server.current_client = NULL;
        return;
    }
    //客户端请求的字符串长度大于服务器最大的请求长度值
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = getClientInfoString(c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    //解析请求
    processInputBuffer(c);
    server.current_client = NULL;
}
processInputBuffer函数主要用来处理请求的解析工作,redis有两种解析方式;行指令解析与多重指令解析,行指令解析直接忽略,下面详解多重指令解析。
void processInputBuffer(redisClient *c) {
    /* Keep processing while there is something in the input buffer */
    while(sdslen(c->querybuf)) {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & REDIS_BLOCKED) return;

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don&#39;t process more commands). */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

        /* Determine request type when unknown. */
        //当请求类型未知时,先确定属于哪种请求
        if (!c->reqtype) {
            if (c->querybuf[0] == &#39;*&#39;) {
                c->reqtype = REDIS_REQ_MULTIBULK;//多重指令解析
            } else {
                c->reqtype = REDIS_REQ_INLINE;//按行解析
            }
        }

        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            //执行相应指令
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}
多重指令解析的处理函数为processMultibulkBuffer,下面先简单介绍下Redis的通讯协议:
以下是这个协议的一般形式:
*< 参数数量 > CR LF
$< 参数 1 的字节数量 > CR LF
< 参数 1 的数据 > CR LF
...
$< 参数 N 的字节数量 > CR LF
< 参数 N 的数据 > CR LF
举个例子,以下是一个命令协议的打印版本:
*3
$3
SET
$3
foo
$3
bar
这个命令的实际协议值如下:
"*3\r\n$3\r\nSET\r\n$3\r\foo\r\n$3\r\bar\r\n"
/**
    例:querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
*/
int processMultibulkBuffer(redisClient *c) {
    char *newline = NULL;
    int pos = 0, ok;
    long long ll;

    if (c->multibulklen == 0) {//参数数目为0,表示这是新的请求指令
        /* The client should have been reset */
        redisAssertWithInfo(c,NULL,c->argc == 0);

        /* Multi bulk length cannot be read without a \r\n */
        newline = strchr(c->querybuf,&#39;\r&#39;);
        if (newline == NULL) {
            if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                addReplyError(c,"Protocol error: too big mbulk count string");
                setProtocolError(c,0);
            }
            return REDIS_ERR;
        }

        /* Buffer should also contain \n */
        if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
            return REDIS_ERR;

        /* We know for sure there is a whole line since newline != NULL,
         * so go ahead and find out the multi bulk length. */
        redisAssertWithInfo(c,NULL,c->querybuf[0] == &#39;*&#39;);
        //将字符串转为long long整数,转换得到的结果存到ll中,ll就是后面参数的个数
        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
        if (!ok || ll > 1024*1024) {
            addReplyError(c,"Protocol error: invalid multibulk length");
            setProtocolError(c,pos);
            return REDIS_ERR;
        }

        pos = (newline-c->querybuf)+2;//跳过\r\n
        if (ll <= 0) {//参数个数小于0,表示后面的参数数目大于等于绝对值ll
             /** s = sdsnew("Hello World");
             * sdsrange(s,1,-1); => "ello World"
             */
            sdsrange(c->querybuf,pos,-1);//querybuf="$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
            return REDIS_OK;
        }

        c->multibulklen = ll;//得到指令参数个数

        /* Setup argv array on client structure */
        if (c->argv) zfree(c->argv);
        c->argv = zmalloc(sizeof(robj*) * c->multibulklen);//申请参数内存空间
    }

    redisAssertWithInfo(c,NULL,c->multibulklen > 0);
    /**
        开始抽取字符串
        querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
        pos = 4
    */
    while(c->multibulklen) {
        /* Read bulk length if unknown */
        if (c->bulklen == -1) {//参数的长度为-1,这里用来处理每个参数的字符串长度值
            /**newline = "\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"*/
            newline = strchr(c->querybuf+pos,&#39;\r&#39;);
            if (newline == NULL) {
                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                    addReplyError(c,"Protocol error: too big bulk count string");
                    setProtocolError(c,0);
                }
                break;
            }

            /* Buffer should also contain \n */
            if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
                break;

            //每个字符串以$开头,后面的数字表示其长度
            if (c->querybuf[pos] != &#39;$&#39;) {
                addReplyErrorFormat(c,
                    "Protocol error: expected &#39;$&#39;, got &#39;%c&#39;",
                    c->querybuf[pos]);
                setProtocolError(c,pos);
                return REDIS_ERR;
            }

            //得到字符串的长度值,ll
            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
            if (!ok || ll < 0 || ll > 512*1024*1024) {
                addReplyError(c,"Protocol error: invalid bulk length");
                setProtocolError(c,pos);
                return REDIS_ERR;
            }

            //pos = 8
            pos += newline-(c->querybuf+pos)+2;//跳过\r\n "SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
            if (ll >= REDIS_MBULK_BIG_ARG) {//字符串长度超过1024*32,需要扩展
                size_t qblen;

                /* If we are going to read a large object from network
                 * try to make it likely that it will start at c->querybuf
                 * boundary so that we can optimize object creation
                 * avoiding a large copy of data. */
                 /**
                    sdsrange(querybuf,pos,-1)是将[pos,len-1]之间的字符串使用memmove前移,
                    然后后面的直接截断
                 */
                sdsrange(c->querybuf,pos,-1);//"SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
                pos = 0;
                qblen = sdslen(c->querybuf);
                /* Hint the sds library about the amount of bytes this string is
                 * going to contain. */
                if (qblen < ll+2)//这里只会到最后一个字符串才可能为True,并且数据不完整,数据不完整是由于redis使用非阻塞的原因
                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
            }
            c->bulklen = ll;
        }

        /* Read bulk argument */
        //读取参数,没有\r\n表示数据不全,也就是说服务器接收到的数据不完整
        if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
            /* Not enough data (+2 == trailing \r\n) */
            break;
        } else {//数据完整
            /* Optimization: if the buffer contains JUST our bulk element
             * instead of creating a new object by *copying* the sds we
             * just use the current sds string. */
            if (pos == 0 &&
                c->bulklen >= REDIS_MBULK_BIG_ARG &&
                (signed) sdslen(c->querybuf) == c->bulklen+2)
            {//数据刚好完整,那么就直接使用c->querybuf,然后清空querybuf,注意这里只可能在最后一个字符串才可能出现
                c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
                c->querybuf = sdsempty();
                /* Assume that if we saw a fat argument we&#39;ll see another one
                 * likely... */
                c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
                pos = 0;
            } else {
                //抽取出具体的字符串,比如SET,建立一个stringObject
                c->argv[c->argc++] =
                    createStringObject(c->querybuf+pos,c->bulklen);
                pos += c->bulklen+2;//跳过\r\n
            }
            c->bulklen = -1;
            c->multibulklen--;
        }
    }

    /**
        由于采用的是非阻塞读取客户端数据的方式,那么如果c->multibulklen != 0,那么就表示
        数据没有接收完全,首先需要将当前的querybuf数据截断
    */
    /* Trim to pos */
    if (pos) sdsrange(c->querybuf,pos,-1);

    /* We&#39;re done when c->multibulk == 0 */
    if (c->multibulklen == 0) return REDIS_OK;

    /* Still not read to process the command */

    return REDIS_ERR;
}

processCommand与call函数

客户端指令解析完之后,需要执行该指令,执行指令的两个函数为processCommand与call函数,这两个函数除了单纯的执行指令外,还做了许多其他的工作,这里不详解,看代码仅仅找到指令如何执行还是很简单的。

指令执行完之后,需要将得到的结果集返回给客户端,这部分是如何工作的,下面开始分析。

在networking.c中可以发现许多以addRelpy为前缀的函数名,这些函数都是用来处理各种不同类型的结果的,我们以典型的addReply函数为例,进行分析。

addReply函数

该函数第一步工作就是调用prepareClientToWrite函数为客户端创建一个写文件事件,事件的处理函数即将结果集发送给客户端的函数为sendReplyToClient.

int prepareClientToWrite(redisClient *c) {
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}
第二步,就是根据相应的条件,将得到的结果rboj数据存储到buf中或者reply链表中。对于存储的策略:redis优先将数据存储在固定大小的buf中,也就是redisClient结构体buf[REDIS_REPLY_CHUNK_BYTES]里,默认大小为16K。如果有数据没有发送完或c->buf空间不足,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。具体的处理函数为_addReplyToBuffer和_addReplyStringToList两个函数。
void addReply(redisClient *c, robj *obj) {
    if (prepareClientToWrite(c) != REDIS_OK) return;

    /* This is an important place where we can avoid copy-on-write
     * when there is a saving child running, avoiding touching the
     * refcount field of the object if it&#39;s not needed.
     *
     * If the encoding is RAW and there is room in the static buffer
     * we&#39;ll be able to send the object to the client without
     * messing with its page. */
    if (obj->encoding == REDIS_ENCODING_RAW) {//字符串类型
        //是否能将数据追加到c->buf中
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);//添加到c->reply链表中
    } else if (obj->encoding == REDIS_ENCODING_INT) {//整数类型
        /* Optimization: if there is room in the static buffer for 32 bytes
         * (more than the max chars a 64 bit integer can take as string) we
         * avoid decoding the object and go for the lower level approach. */
         //追加到c->buf中
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
            char buf[32];
            int len;

            len = ll2string(buf,sizeof(buf),(long)obj->ptr);//整型转string
            if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
                return;
            /* else... continue with the normal code path, but should never
             * happen actually since we verified there is room. */
        }
        obj = getDecodedObject(obj);//64位整数,先转换为字符串
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
        decrRefCount(obj);
    } else {
        redisPanic("Wrong obj->encoding in addReply()");
    }
}
/**
    Server将数据发送给Client,有两种存储数据的缓冲形式,具体参见redisClient结构体
    1、Response buffer
        int bufpos; //回复
        char buf[REDIS_REPLY_CHUNK_BYTES]; //长度为16 * 1024
    2、list *reply;
        unsigned long reply_bytes; Tot bytes of objects in reply list
        int sentlen;            已发送的字节数
    如果已经使用reply的形式或者buf已经不够存储,那么就将数据添加到list *reply中
    否则将数据添加到buf中
*/
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;//计算出c->buf的剩余长度

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return REDIS_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    //回复数据追加到buf中
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}

/**
    1、如果链表长度为0: 新建一个节点并直接将robj追加到链表的尾部
    2、链表长度不为0: 首先取出链表的尾部节点
        1)、尾部节点的字符串长度 + robj中ptr字符串的长度 <= REDIS_REPLY_CHUNK_BYTES:
            将robj->ptr追加到尾节点的tail->ptr后面
        2)、反之: 新建一个节点并直接将robj追加到链表的尾部
*/
void _addReplyObjectToList(redisClient *c, robj *o) {
    robj *tail;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

    //链表长度为0
    if (listLength(c->reply) == 0) {
        incrRefCount(o);//增加引用次数
        listAddNodeTail(c->reply,o);//添加到链表末尾
        c->reply_bytes += zmalloc_size_sds(o->ptr); //计算o->ptr的占用内存大小
    } else {
        //取出链表尾中的数据
        tail = listNodeValue(listLast(c->reply));

        /* Append to this object when possible. */
        // 如果最后一个节点所保存的回复加上新回复内容总长度小于等于 REDIS_REPLY_CHUNK_BYTES
        // 那么将新回复追加到节点回复当中。
        if (tail->ptr != NULL &&
            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
        {
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        } else {//为新回复单独创建一个节点
            incrRefCount(o);
            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }
    }
    // 如果突破了客户端的最大缓存限制,那么关闭客户端
    asyncCloseClientOnOutputBufferLimitReached(c);
}

sendReplyToClient函数

终于到了最后一步,把c->buf与c->reply中的数据发送给客户端即可,发送同样使用的是最原始的write函数。发送完成之后,redis将当前客户端释放,并且删除写事件,代码比较简单,不详细解释。

小结

本文粗略的介绍了Redis整体运行的流程,从服务器的角度,介绍Redis是如何初始化,创建socket,接收客户端请求,解析请求及指令的执行,反馈执行的结果集给客户端等。如果读者想更深入的了解Redis的运行机制,需要亲自阅读源码,本文可以用作参考。同时也是学习linux socket编程的好工具,原本简简单单的socket->bind->listen->accept->read->write也可以用来做许多高效的业务,是Linux socket学习的不二选择。

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn