This article brings you related issues about redis. It mainly introduces the relevant content about event processing examples, including the introduction of redis events, the abstraction of events, and the implementation of events. Let’s take a look at it together. ,I hope everyone has to help.
Recommended learning: Redis video tutorial
Redis server is aEvent driver
, the so-called event driver is to enter a command and press Enter, and then the message is assembled into the format of the Redis
protocol and sent to the Redis
server. At this time, An event will be generated. Redis
The server will receive the command, process the command and send a reply. When we do not interact with the server, the server will be in a blocking waiting state, and it will give up the CPU and then enter In sleep state, when an event is triggered, it will be awakened by the operating system.
The Redis server needs to handle the following two types of events:
File events
: The Redis server passes the socket The socket connects to the client (or other Redis server), and the file event is the abstraction of the socket operation by the server. The communication between the server and the client (or other server) will generate corresponding file events, and the server listens And process these events to complete a series of network communication operations.
Time events
: Some operations in the Redis server (such as the serverCron function) need to be executed at a given point in time, and time events It is the server's abstraction of such timing operations.
Redis abstracts file events
and time events
into one respectively Data structure to manage.
typedef struct aeFileEvent { // 文件时间类型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; // 可读处理函数 aeFileProc *rfileProc; // 可写处理函数 aeFileProc *wfileProc; // 客户端传入的数据 void *clientData; } aeFileEvent; //文件事件
The rfileProc
and wfileProc
members are two function pointers respectively, and their prototypes are :
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
This function is a callback function
. If the event type specified by the current file event occurs, the corresponding callback function
will be called to handle the event.
When the event is ready, we need to know the file descriptor and event type of the file event in order to lock the event, so the aeFiredEvent
structure is defined for unified management:
typedef struct aeFiredEvent { // 就绪事件的文件描述符 int fd; // 就绪事件类型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; } aeFiredEvent; //就绪事件
Type of file event:
#define AE_NONE 0 //未设置 #define AE_READABLE 1 //事件可读 #define AE_WRITABLE 2 //事件可写
typedef struct aeTimeEvent { // 时间事件的id long long id; // 时间事件到达的时间的秒数 long when_sec; /* seconds */ // 时间事件到达的时间的毫秒数 long when_ms; /* milliseconds */ // 时间事件处理函数 aeTimeProc *timeProc; // 时间事件终结函数 aeEventFinalizerProc *finalizerProc; // 客户端传入的数据 void *clientData; // 指向下一个时间事件 struct aeTimeEvent *next; } aeTimeEvent; //时间事件
It can be seen that the structure of time event is the node of a linked list, because struct aeTimeEvent *next
It is a pointer to the next time event.
Like file events, when the event specified by the time event occurs, the corresponding callback function
, structure member timeProc
and finalizerProc
are both callback functions, and the function prototype is as follows:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
Although file events and time events have been abstracted, Redis
still needs to abstract events Make an overall abstraction to describe the status of an event. That is, the event status structure to be introduced below: aeEventLoop
.
typedef struct aeEventLoop { // 当前已注册的最大的文件描述符 int maxfd; /* highest file descriptor currently registered */ // 文件描述符监听集合的大小 int setsize; /* max number of file descriptors tracked */ // 下一个时间事件的ID long long timeEventNextId; // 最后一次执行事件的时间 time_t lastTime; /* Used to detect system clock skew */ // 注册的文件事件表 aeFileEvent *events; /* Registered events */ // 已就绪的文件事件表 aeFiredEvent *fired; /* Fired events */ // 时间事件的头节点指针 aeTimeEvent *timeEventHead; // 事件处理开关 int stop; // 多路复用库的事件状态数据 void *apidata; /* This is used for polling API specific data */ // 执行处理事件之前的函数 aeBeforeSleepProc *beforesleep; } aeEventLoop; //事件轮询的状态结构
# The ##aeEventLoop structure saves a
void * type universal pointer
apidata, which is used to save the status of the polling event, that is, to save the multiplexing library of the underlying call The event status.
RedisI/O
All functions of the multiplexer are wrapped by wrapping the common
select,
epoll,
evport and
kqueue are implemented by the
I/O multiplexing function library. Each I/O is multiplexed The function library corresponds to a separate file in the Redis source code, such as
ae_select.c,
ae_epoll.c, etc.
Redis, and the APIs of all libraries are the same, which allows the underlying layer of the Redis multiplexing program Can be interchanged.
// IO复用的选择,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c" #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endifYou can also use the command
INFO server to check which multiplexing library is currently used:
epoll multiplexing library is used by default under Linux, so
apidata is saved in
epollThe event state structure of the model, which is in the
ae_epoll.c source file:
typedef struct aeApiState { // epoll事件的文件描述符 int epfd; // 事件表 struct epoll_event *events; } aeApiState; // 事件的状态
epollThe
struct epoll_event structure of the model The types of epoll events are defined in, such as
EPOLLIN,
EPOLLOUT, etc., but the Redis file structure
aeFileEvent is also in
mask Define your own event types, such as:
AE_READABLE,
AE_WRITABLE, etc., so you need to implement a middle layer to connect the two event types, which is what was mentioned before
ae_epoll.cThe same API implemented in the file:
// 创建一个epoll实例,保存到eventLoop中 static int aeApiCreate(aeEventLoop *eventLoop) // 调整事件表的大小 static int aeApiResize(aeEventLoop *eventLoop, int setsize) // 释放epoll实例和事件表空间 static void aeApiFree(aeEventLoop *eventLoop) // 在epfd标识的事件表上注册fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) // 在epfd标识的事件表上注删除fd的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) // 等待所监听文件描述符上有事件发生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) // 返回正在使用的IO多路复用库的名字 static char *aeApiName(void)
这些API会讲epoll
的底层函数封装起来,Redis实现事件时,只需要调用这些接口即可.
我们以下面两个API的源码举例:
aeApiAddEvent
该函数会向Redis事件状态结构aeEventLoop
的事件表event
注册一个事件,对应的是epoll_ctl
函数.
// 在epfd标识的事件表上注册fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; // EPOLL_CTL_ADD,向epfd注册fd的上的event // EPOLL_CTL_MOD,修改fd已注册的event // #define AE_NONE 0 //未设置 // #define AE_READABLE 1 //事件可读 // #define AE_WRITABLE 2 //事件可写 // 判断fd事件的操作,如果没有设置事件,则进行关联mask类型事件,否则进行修改 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // struct epoll_event { // uint32_t events; /* Epoll events */ // epoll_data_t data; /* User data variable */ // }; ee.events = 0; // 如果是修改事件,合并之前的事件类型 mask |= eventLoop->events[fd].mask; /* Merge old events */ // 根据mask映射epoll的事件类型 if (mask & AE_READABLE) ee.events |= EPOLLIN; //读事件 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; //写事件 ee.data.fd = fd; //设置事件所从属的目标文件描述符 // 将ee事件注册到epoll中 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
aeApiPoll
等待所监听文件描述符上有事件发生,对应着底层的epoll_wait
函数.
// 等待所监听文件描述符上有事件发生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 监听事件表上是否有事件发生 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 至少有一个就绪的事件 if (retval > 0) { int j; numevents = retval; // 遍历就绪的事件表,将其加入到eventLoop的就绪事件表中 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; // 根据就绪的事件类型,设置mask if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 添加到就绪事件表中 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回就绪的事件个数 return numevents; }
事件的所有源码都定义在ae.c
源文件中,先从aeMain
函数说起.
// 事件轮询的主函数 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; // 一直处理事件 while (!eventLoop->stop) { // 执行处理事件之前的函数 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //处理到时的时间事件和就绪的文件事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
可以看到,如果服务器一直处理事件,那么就是一个死循环,而一个最典型的事件驱动,就是一个死循环. 在循环中,程序会调用处理事件的函数aeProcessEvents()
,它的参数是一个事件状态结构aeEventLoop
和AE_ALL_EVENTS
.
事件类型的宏定义,在ae.h
头文件中:
#define AE_FILE_EVENTS 1 //文件事件 #define AE_TIME_EVENTS 2 //时间事件 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //文件和时间事件 #define AE_DONT_WAIT 4
// 处理到时的时间事件和就绪的文件事件 // 如果flags = 0,函数什么都不做,直接返回 // 如果flags设置了 AE_ALL_EVENTS ,则执行所有类型的事件 // 如果flags设置了 AE_FILE_EVENTS ,则执行文件事件 // 如果flags设置了 AE_TIME_EVENTS ,则执行时间事件 // 如果flags设置了 AE_DONT_WAIT ,那么函数处理完事件后直接返回,不阻塞等待 // 函数返回执行的事件个数 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; // 如果什么事件都没有设置则直接返回 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; // 请注意,既然我们要处理时间事件,即使没有要处理的文件事件,我们仍要调用select(),以便在下一次事件准备启动之前进行休眠 // 当前还没有要处理的文件事件,或者设置了时间事件但是没有设置不阻塞标识 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 如果设置了时间事件而没有设置不阻塞标识 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 获取最近到时的时间事件 shortest = aeSearchNearestTimer(eventLoop); // 获取到了最早到时的时间事件 if (shortest) { long now_sec, now_ms; // 获取当前时间 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 等待该时间事件到时所需要的时长 long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; // 如果没到时 if (ms > 0) { // 保存时长到tvp中 tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; // 如果已经到时,则将tvp的时间设置为0 } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } // 没有获取到了最早到时的时间事件,时间事件链表为空 } else { // 如果设置了不阻塞标识 if (flags & AE_DONT_WAIT) { // 将tvp的时间设置为0,就不会阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { // 阻塞到第一个时间事件的到来 /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 等待所监听文件描述符上有事件发生 // 如果tvp为NULL,则阻塞在此,否则等待tvp设置阻塞的时间,就会有时间事件到时 // 返回了就绪文件事件的个数 numevents = aeApiPoll(eventLoop, tvp); // 遍历就绪文件事件表 for (j = 0; j < numevents; j++) { // 获取就绪文件事件的地址 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; // 获取就绪文件事件的类型,文件描述符 int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 如果是文件可读事件发生 if (fe->mask & mask & AE_READABLE) { // 设置读事件标识 且 调用读事件方法处理读事件 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 如果是文件可写事件发生 if (fe->mask & mask & AE_WRITABLE) { // 读写事件的执行发法不同,则执行写事件,避免重复执行相同的方法 if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; //执行的事件次数加1 } } /* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
Redis服务器在没有被事件触发时,如果没有设置AE_DONT_WAIT
标识,就会开始阻塞等待. 但是它不会死等待,因为还需要处理时间事件,所以在调用aeApiPoll
进行监听之前,会先从时间事件表中获取一个最近到达的时间,根据需要等待的时间构建一个struct timeval tv, *tvp
结构的变量,这个变量保存着服务器阻塞等待文件事件的最长时间,一旦时间到达而没有触发文件事件aeApiPoll
函数就会停止阻塞,进而调用processTimeEvents
函数处理时间事件.
如果在阻塞等待的最长时间之间,触发了文件事件,就会先执行文件事件,后执行时间事件,因此处理时间事件通常比预设的会晚一点.
而执行文件事件rfileProc
和wfileProc
也是调用了回调函数,Redis将文件事件的处理分为了好几种,用于处理不同的网络通信需求:
acceptTcpHandler
:用于accept
client的connect
.acceptUnixHandler
:用于accept
client的本地connect
.sendReplyToClient
:用于向client发送命令回复.readQueryFromClient
:用于读入client发送的请求.然后我们来看一下获取最快达到时间事件的函数aeSearchNearestTimer
实现:
// 寻找第一个快到时的时间事件 // 这个操作是有用的知道有多少时间可以选择该事件设置为不用推迟任何事件的睡眠中。 // 如果事件链表没有时间将返回NULL。 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { // 时间事件头节点地址 aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; // 遍历所有的时间事件 while(te) { // 寻找第一个快到时的时间事件,保存到nearest中 if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; }
该函数就是遍历时间事件链表,然后找到最小值.
我们重点看执行时间事件的函数processTimeEvents
函数的实现:
// 执行时间事件 static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te, *prev; long long maxId; time_t now = time(NULL); // 这里尝试发现时间混乱的情况,上一次处理事件的时间比当前时间还要大 // 重置最近一次处理事件的时间 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } // 设置上一次时间事件处理的时间为当前时间 eventLoop->lastTime = now; prev = NULL; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; //当前时间事件表中的最大ID // 遍历时间事件链表 while(te) { long now_sec, now_ms; long long id; /* Remove events scheduled for deletion. */ // 如果时间事件已被删除了 if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; // 从事件链表中删除事件的节点 if (prev == NULL) eventLoop->timeEventHead = te->next; else prev->next = te->next; // 调用时间事件终结方法清除该事件 if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue; } // 确保我们不处理在此迭代中由时间事件创建的时间事件. 请注意,此检查目前无效:我们总是在头节点添加新的计时器,但是如果我们更改实施细节,则该检查可能会再次有用:我们将其保留在未来的防御 if (te->id > maxId) { te = te->next; continue; } // 获取当前时间 aeGetTime(&now_sec, &now_ms); // 找到已经到时的时间事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; // 调用时间事件处理方法 retval = te->timeProc(eventLoop, id, te->clientData); // 时间事件次数加1 processed++; // 如果不是定时事件,则继续设置它的到时时间 if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); // 如果是定时时间,则retval为-1,则将其时间事件删除,惰性删除 } else { te->id = AE_DELETED_EVENT_ID; } } // 更新前驱节点指针和后继节点指针 prev = te; te = te->next; } return processed; //返回执行事件的次数 }
如果时间事件不存在,则就调用finalizerProc
指向的回调函数,删除当前的时间事件. 如果存在,就调用timeProc
指向的回调函数处理时间事件. Redis的时间事件分为两类:
如果当前的时间事件是周期性,那么就会在将时间周期添加到周期事件的到时时间中. 如果是定时事件,则将该时间事件删除.
推荐学习:Redis视频教程
The above is the detailed content of One article to understand the event processing of Redis source code design analysis. For more information, please follow other related articles on the PHP Chinese website!