Maison >base de données >Redis >Exemple de traitement de commande Redis, analyse du code source
Cet article est basé sur la version communautaire de Redis 4.0.8
La demande de commande reçue par le serveur Redis est d'abord stockée dans le tampon d'entrée querybuf de l'objet client, puis analyse chacune d'elles. paramètre de la demande de commande et le stocke dans les champs argv et argc de l'objet client.
La fonction d'entrée permettant au client d'analyser la demande de commande est readQueryFromClient, qui lit les données du socket et les stocke dans le tampon d'entrée de l'objet client, et appelle la fonction processInputBuffer pour analyser la demande de commande.
Remarque : Commande en ligne : utilisez la session telnet pour saisir la commande
void processInputBuffer(client *c) { ...... //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度 while(sdslen(c->querybuf)) { if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中 } else { serverPanic("Unknown request type"); } } } //解析命令参数和长度暂存到客户端结构体中 int processMultibulkBuffer(client *c) { //定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度 /* Setup argv array on client structure */ //分配请求参数存储空间 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 开始循环解析每个请求参数 while(c->multibulklen) { ...... newline = strchr(c->querybuf+pos,'\r'); if (c->querybuf[pos] != '$') { return C_ERR; ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); pos += newline-(c->querybuf+pos)+2; c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段 //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; c->multibulklen--; }
Lorsque la valeur de multibulklen est mise à jour à 0, cela signifie que l'analyse des paramètres est terminée et que processCommand commence à être appelé. traiter la commande.Il existe de nombreuses commandes avant le traitement.La logique de vérification est la suivante :
void processInputBuffer(client *c) { ...... //调用processCommand来处理命令 if (processCommand(c) == C_OK) { ...... } } //处理命令函数 int processCommand(client *c) { //校验是否是quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //调用lookupCommand,查看该命令是否存在 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; //检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { addReply(c,shared.noautherr); //还有很多检查,不一一列举,比如集群/持久化/复制等 /* 真正执行命令 */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) queueMultiCommand(c); //将结果写入outbuffer addReply(c,shared.queued); } // 调用execCommand执行命令 void execCommand(client *c) { call(c,CMD_CALL_FULL);//调用call执行命令 //调用execCommand调用call执行命令 void call(client *c, int flags) { start = ustime(); c->cmd->proc(c);//执行命令 duration = ustime()-start; //如果是慢查询,记录慢查询 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); //记录到慢日志中 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); //更新统计信息:当前命令执行时间和调用次数 if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++;
Le résultat de retour de Redis n'est pas directement renvoyé au client, mais est d'abord écrit dans le tampon de sortie (champ buf) ou liste chaînée de sortie (champ de réponse)
int processCommand(client *c) { ...... //将结果写入outbuffer addReply(c,shared.queued); ...... } //将结果写入outbuffer void addReply(client *c, robj *obj) { //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送 if (prepareClientToWrite(c) != C_OK) return; //然后添加字符串到输出缓冲区 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) //如果添加失败,则添加到输出链表中 _addReplyObjectToList(c,obj); }
addReply La fonction ne stocke que temporairement les données à envoyer au client dans la liste chaînée de sortie ou le tampon de sortie. Alors, quand ces données seront-elles envoyées au client ? La réponse est la fonction beforesleep appelée lorsque la boucle d'événements est activée. Cette fonction effectue spécifiquement certaines opérations qui ne prennent pas beaucoup de temps, comme la suppression des clés expirées, le renvoi des réponses de commande au client, etc.
void beforeSleep(struct aeEventLoop *eventLoop) { ...... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); } //回复客户端命令函数 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); /* 发送客户端数据 */ if (writeToClient(c->fd,c,0) == C_ERR) continue; /* If there is nothing left, do nothing. Otherwise install * the write handler. */ //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可 if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } return processed;
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!