Rumah >pangkalan data >Redis >Pemprosesan arahan Redis contoh analisis kod sumber
Artikel ini adalah berdasarkan versi komuniti Redis 4.0.8
Permintaan arahan yang diterima oleh pelayan Redis ialah mula-mula disimpan dalam objek klien Penampan input querybuf kemudian menghuraikan pelbagai parameter permintaan arahan dan menyimpannya dalam medan argv dan argc objek klien.
Fungsi kemasukan untuk klien menghuraikan permintaan arahan ialah readQueryFromClient, yang membaca data soket dan menyimpannya dalam penimbal input objek klien, dan memanggil fungsi processInputBuffer untuk menghuraikan permintaan arahan.
Nota: Perintah sebaris: Gunakan sesi telnet untuk memasukkan arahan
void processInputBuffer(client *c) { ...... //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度 while(sdslen(c->querybuf)) { if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中 } else { serverPanic("Unknown request type"); } } } //解析命令参数和长度暂存到客户端结构体中 int processMultibulkBuffer(client *c) { //定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度 /* Setup argv array on client structure */ //分配请求参数存储空间 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 开始循环解析每个请求参数 while(c->multibulklen) { ...... newline = strchr(c->querybuf+pos,'\r'); if (c->querybuf[pos] != '$') { return C_ERR; ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); pos += newline-(c->querybuf+pos)+2; c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段 //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; c->multibulklen--; }
Apabila nilai multibulklen dikemas kini kepada 0, ini bermakna bahawa parsing parameter selesai. Mula memanggil processCommand untuk memproses arahan Terdapat banyak logik pengesahan sebelum memproses arahan, seperti berikut:
void processInputBuffer(client *c) { ...... //调用processCommand来处理命令 if (processCommand(c) == C_OK) { ...... } } //处理命令函数 int processCommand(client *c) { //校验是否是quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //调用lookupCommand,查看该命令是否存在 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; //检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { addReply(c,shared.noautherr); //还有很多检查,不一一列举,比如集群/持久化/复制等 /* 真正执行命令 */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) queueMultiCommand(c); //将结果写入outbuffer addReply(c,shared.queued); } // 调用execCommand执行命令 void execCommand(client *c) { call(c,CMD_CALL_FULL);//调用call执行命令 //调用execCommand调用call执行命令 void call(client *c, int flags) { start = ustime(); c->cmd->proc(c);//执行命令 duration = ustime()-start; //如果是慢查询,记录慢查询 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); //记录到慢日志中 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); //更新统计信息:当前命令执行时间和调用次数 if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++;
Hasil dikembalikan oleh Redis tidak dikembalikan terus kepada klien, tetapi ditulis dahulu Masukkan penimbal output (medan buf) atau senarai dipautkan output (medan balasan)
int processCommand(client *c) { ...... //将结果写入outbuffer addReply(c,shared.queued); ...... } //将结果写入outbuffer void addReply(client *c, robj *obj) { //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送 if (prepareClientToWrite(c) != C_OK) return; //然后添加字符串到输出缓冲区 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) //如果添加失败,则添加到输出链表中 _addReplyObjectToList(c,obj); }
Fungsi addReply hanya menyimpan sementara data untuk dihantar kepada klien dalam senarai terpaut keluaran atau penimbal keluaran, jadi bilakah data akan dihantar kepada Bagaimana pula dengan klien? Jawapannya ialah fungsi sebelum tidur yang dipanggil apabila gelung acara dihidupkan Fungsi ini secara khusus melaksanakan beberapa operasi yang tidak memakan masa, seperti memadamkan kunci tamat tempoh, mengembalikan balasan arahan kepada klien, dsb.
void beforeSleep(struct aeEventLoop *eventLoop) { ...... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); } //回复客户端命令函数 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); /* 发送客户端数据 */ if (writeToClient(c->fd,c,0) == C_ERR) continue; /* If there is nothing left, do nothing. Otherwise install * the write handler. */ //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可 if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } return processed;.
Atas ialah kandungan terperinci Pemprosesan arahan Redis contoh analisis kod sumber. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!