概述
在前面的文章中《Nginx 事件模組》介紹了Nginx 的事件驅動框架以及不同類型事件驅動模組的管理。本節基於前面的知識,簡單介紹下在Linux 系統下的 epoll 事件驅動模組。關於 epoll 的使用與原理可以參考文章 《epoll 解析》。這裡直接介紹Nginx 伺服器基於事件驅動框架實現的 epoll 事件驅動模組。
ngx_epoll_module 事件驅動模塊
ngx_epoll_conf_t 結構體
ngx_epoll_conf_t 結構體是保存ngx_epoll_module 事件驅動模塊的配置項結構。此結構體在檔案src/event/modules/ngx_epoll_module.c 中定義:
/* 存储epoll模块配置项结构体 */ typedef struct { ngx_uint_t events; /* 表示epoll_wait函数返回的最大事件数 */ ngx_uint_t aio_requests; /* 并发处理异步IO事件个数 */ } ngx_epoll_conf_t;
ngx_epoll_module 事件驅動模組的定義
ng 的oll_module模組在檔案src/event/modules/ngx_epoll_module.c 中定義如下:
/* epoll模块定义 */ ngx_module_t ngx_epoll_module = { NGX_MODULE_V1, &ngx_epoll_module_ctx, /* module context */ ngx_epoll_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };在ngx_epoll_module 模組的定義中,其中定義了此設定項文件
src/event/modules/ngx_epoll_module.c 中定義:
/* 定义epoll模块感兴趣的配置项结构数组 */ static ngx_command_t ngx_epoll_commands[] = { /* * epoll_events配置项表示epoll_wait函数每次返回的最多事件数(即第3个参数), * 在ngx_epoll_init函数中会预分配epoll_events配置项指定的epoll_event结构体; */ { ngx_string("epoll_events"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, events), NULL }, /* * 该配置项表示创建的异步IO上下文能并发处理异步IO事件的个数, * 即io_setup函数的第一个参数; */ { ngx_string("worker_aio_requests"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, aio_requests), NULL }, ngx_null_command };在ngx_epoll_module 模組的定義中,定義了此模組的上下文結構是基於上下文module_t結構來定義的。這個上下文結構在檔案src/event/modules/ngx_epoll_module.c
中定義:/* 由事件模块通用接口ngx_event_module_t定义的epoll模块上下文结构 */
ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
NULL, /* process the changes */
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
在 ngx_epoll_module 模組的上下文事件介面結構中,並專注於定義了執行方法的介面方法。 ngx_epoll_module 事件驅動模組的操作
ngx_epoll_module 模組的操作是由發現事件的上下文此成員實現的方法如下所示: ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
NULL, /* process the changes */
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
模組的初始化
ngx_epoll_module_ 模組的初始化由函數ngx_ollin 實現。函數主要做了兩件事:創建 epoll 物件和建立event_list 陣列(呼叫epoll_wait 函數時用於儲存從內核複製的已就緒的事件);該函數在檔案
src/event/modules/ngx_epoll_module.c3 ngx_epoll_module 模組的事件處理由函數ngx_epoll_process_events 模組的事件處理由函數ngx_epoll_process_events 實作。 ngx_epoll_process_events 函式是實作事件收集、事件傳送的介面。此函數在檔案src/event/modules/ngx_epoll_module.c 中定義:static int ep = -1; /* epoll对象描述符 */
static struct epoll_event *event_list; /* 作为epoll_wait函数的第二个参数,保存从内存复制的事件 */
static ngx_uint_t nevents; /* epoll_wait函数返回的最多事件数 */
/* epoll模块初始化函数 */
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;
/* 获取ngx_epoll_module模块的配置项结构 */
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
if (ep == -1) {
/* 调用epoll_create函数创建epoll对象描述符 */
ep = epoll_create(cycle->connection_n / 2);
/* 若创建失败,则出错返回 */
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
#if (NGX_HAVE_FILE_AIO)
/* 若系统支持异步IO,则初始化异步IO */
ngx_epoll_aio_init(cycle, epcf);
#endif
}
/*
* 预分配events个epoll_event结构event_list,event_list是存储产生事件的数组;
* events由epoll_events配置项指定;
*/
if (nevents < epcf->events) {
/*
* 若现有event_list个数小于配置项所指定的值epcf->events,
* 则先释放,再从新分配;
*/
if (event_list) {
ngx_free(event_list);
}
/* 预分配epcf->events个epoll_event结构,并使event_list指向该地址 */
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
/* 设置正确的epoll_event结构个数 */
nevents = epcf->events;
/* 指定IO的读写方法 */
/*
* 初始化全局变量ngx_io, ngx_os_io定义为:
ngx_os_io_t ngx_os_io = {
ngx_unix_recv,
ngx_readv_chain,
ngx_udp_unix_recv,
ngx_unix_send,
ngx_writev_chain,
0
};(位于src/os/unix/ngx_posix_init.c)
*/
ngx_io = ngx_os_io;
/* 设置ngx_event_actions 接口 */
ngx_event_actions = ngx_epoll_module_ctx.actions;
#if (NGX_HAVE_CLEAR_EVENT)
/* ET模式 */
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
/* LT模式 */
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;
return NGX_OK;
}
ngx_epoll_del_event 實作。這兩個函數的實作都是透過調用 epoll_ctl 函數。具體實現在文件 src/event/modules/ngx_epoll_module.c 中定義:
/* 处理已准备就绪的事件 */ static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); /* 调用epoll_wait在规定的timer时间内等待监控的事件准备就绪 */ events = epoll_wait(ep, event_list, (int) nevents, timer); /* 若出错,设置错误编码 */ err = (events == -1) ? ngx_errno : 0; /* * 若没有设置timer_resolution配置项时, * NGX_UPDATE_TIME 标志表示每次调用epoll_wait函数返回后需要更新时间; * 若设置timer_resolution配置项, * 则每隔timer_resolution配置项参数会设置ngx_event_timer_alarm为1,表示需要更新时间; */ if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { /* 更新时间,将时间缓存到一组全局变量中,方便程序高效获取事件 */ ngx_time_update(); } /* 处理epoll_wait的错误 */ if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } /* * 若epoll_wait返回的事件数events为0,则有两种可能: * 1、超时返回,即时间超过timer; * 2、在限定的timer时间内返回,此时表示出错error返回; */ if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } /* 仅在多线程环境下有效 */ ngx_mutex_lock(ngx_posted_events_mutex); /* 遍历由epoll_wait返回的所有已准备就绪的事件,并处理这些事件 */ for (i = 0; i < events; i++) { /* * 获取与事件关联的连接对象; * 连接对象地址的最低位保存的是添加事件时设置的事件过期标志位; */ c = event_list[i].data.ptr; /* 获取事件过期标志位,即连接对象地址的最低位 */ instance = (uintptr_t) c & 1; /* 屏蔽连接对象的最低位,即获取连接对象的真正地址 */ c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); /* 获取读事件 */ rev = c->read; /* * 同一连接的读写事件的instance标志位是相同的; * 若fd描述符为-1,或连接对象读事件的instance标志位不相同,则判为过期事件; */ if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } /* 获取连接对象中已准备就绪的事件类型 */ revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); /* 记录epoll_wait的错误返回状态 */ /* * EPOLLERR表示连接出错;EPOLLHUP表示收到RST报文; * 检测到上面这两种错误时,TCP连接中可能存在未读取的数据; */ if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif /* * 若连接发生错误且未设置EPOLLIN、EPOLLOUT, * 则将EPOLLIN、EPOLLOUT添加到revents中; * 即在调用读写事件时能够处理连接的错误; */ if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } /* 连接有可读事件,且该读事件是active活跃的 */ if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) /* EPOLLRDHUP表示连接对端关闭了读取端 */ if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } #endif if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { /* 读事件已准备就绪 */ /* * 这里要区分active与ready: * active是指事件被添加到epoll对象的监控中, * 而ready表示被监控的事件已经准备就绪,即可以对其进程IO处理; */ rev->ready = 1; } /* * NGX_POST_EVENTS表示已准备就绪的事件需要延迟处理, * 根据accept标志位将事件加入到相应的队列中; */ if (flags & NGX_POST_EVENTS) { queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else { /* 若不延迟处理,则直接调用事件的处理函数 */ rev->handler(rev); } } /* 获取连接的写事件,写事件的处理逻辑过程与读事件类似 */ wev = c->write; /* 连接有可写事件,且该写事件是active活跃的 */ if ((revents & EPOLLOUT) && wev->active) { /* 检查写事件是否过期 */ if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; } else { /* 写事件已准备就绪 */ wev->ready = 1; } /* * NGX_POST_EVENTS表示已准备就绪的事件需要延迟处理, * 根据accept标志位将事件加入到相应的队列中; */ if (flags & NGX_POST_EVENTS) { ngx_locked_post_event(wev, &ngx_posted_events); } else { /* 若不延迟处理,则直接调用事件的处理函数 */ wev->handler(wev); } } } ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; }ngx_epoll_module 模組的連接新增與刪除
ngx_epoll_del_connection 實作。這兩個函數的實作都是透過調用 epoll_ctl 函數。具體實現在文件 src/event/modules/ngx_epoll_module.c 中定義:
/* 将某个描述符的某个事件添加到epoll对象的监控机制中 */ static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; /* 每个事件的data成员都存放着其对应的ngx_connection_t连接 */ /* 获取事件关联的连接 */ c = ev->data; /* events参数是方便下面确定当前事件是可读还是可写 */ events = (uint32_t) event; /* * 这里在判断事件类型是可读还是可写,必须根据事件的active标志位来判断事件是否活跃; * 因为epoll_ctl函数有添加add和修改mod模式, * 若一个事件所关联的连接已经在epoll对象的监控中,则只需修改事件的类型即可; * 若一个事件所关联的连接没有在epoll对象的监控中,则需要将其相应的事件类型注册到epoll对象中; * 这样做的情况是避免与事件相关联的连接两次注册到epoll对象中; */ if (event == NGX_READ_EVENT) { /* * 若待添加的事件类型event是可读; * 则首先判断该事件所关联的连接是否将写事件添加到epoll对象中, * 即先判断关联的连接的写事件是否为活跃事件; */ e = c->write; prev = EPOLLOUT; #if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP) events = EPOLLIN|EPOLLRDHUP; #endif } else { e = c->read; prev = EPOLLIN|EPOLLRDHUP; #if (NGX_WRITE_EVENT != EPOLLOUT) events = EPOLLOUT; #endif } /* 根据active标志位确定事件是否为活跃事件,以决定到达是修改还是添加事件 */ if (e->active) { /* 若当前事件是活跃事件,则只需修改其事件类型即可 */ op = EPOLL_CTL_MOD; events |= prev; } else { /* 若当前事件不是活跃事件,则将该事件添加到epoll对象中 */ op = EPOLL_CTL_ADD; } /* 将flags参数加入到events标志位中 */ ee.events = events | (uint32_t) flags; /* prt存储事件关联的连接对象ngx_connection_t以及过期事件instance标志位 */ ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); /* 调用epoll_ctl方法向epoll对象添加事件或在epoll对象中修改事件 */ if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } /* 将该事件的active标志位设置为1,表示当前事件是活跃事件 */ ev->active = 1; #if 0 ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; #endif return NGX_OK; } /* 将某个连接的某个事件从epoll对象监控中删除 */ static ngx_int_t ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; /* * when the file descriptor is closed, the epoll automatically deletes * it from its queue, so we do not need to delete explicitly the event * before the closing the file descriptor */ /* 当事件关联的文件描述符关闭后,epoll对象自动将其事件删除 */ if (flags & NGX_CLOSE_EVENT) { ev->active = 0; return NGX_OK; } /* 获取事件关联的连接对象 */ c = ev->data; /* 根据event参数判断当前删除的是读事件还是写事件 */ if (event == NGX_READ_EVENT) { /* 若要删除读事件,则首先判断写事件的active标志位 */ e = c->write; prev = EPOLLOUT; } else { /* 若要删除写事件,则判断读事件的active标志位 */ e = c->read; prev = EPOLLIN|EPOLLRDHUP; } /* * 若要删除读事件,且写事件是活跃事件,则修改事件类型即可; * 若要删除写事件,且读事件是活跃事件,则修改事件类型即可; */ if (e->active) { op = EPOLL_CTL_MOD; ee.events = prev | (uint32_t) flags; ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); } else { /* 若读写事件都不是活跃事件,此时表示事件未准备就绪,则将其删除 */ op = EPOLL_CTL_DEL; ee.events = 0; ee.data.ptr = NULL; } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll del event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); /* 删除或修改事件 */ if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } /* 设置当前事件的active标志位 */ ev->active = 0; return NGX_OK; }ngx_epoll_module 模組的非同步I/Ogin事件完成後的通知是集成到
/* 将指定连接所关联的描述符添加到epoll对象中 */ static ngx_int_t ngx_epoll_add_connection(ngx_connection_t *c) { struct epoll_event ee; /* 设置事件的类型:可读、可写、ET模式 */ ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP; ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, "epoll add connection: fd:%d ev:%08XD", c->fd, ee.events); /* 调用epoll_ctl方法将连接所关联的描述符添加到epoll对象中 */ if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno, "epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd); return NGX_ERROR; } /* 设置读写事件的active标志位 */ c->read->active = 1; c->write->active = 1; return NGX_OK; } Nginx /* 将连接所关联的描述符从epoll对象中删除 */ static ngx_int_t ngx_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags) { int op; struct epoll_event ee; /* * when the file descriptor is closed the epoll automatically deletes * it from its queue so we do not need to delete explicitly the event * before the closing the file descriptor */ if (flags & NGX_CLOSE_EVENT) { c->read->active = 0; c->write->active = 0; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENTNginx, c->log, 0, "epoll del connection: fd:%d", c->fd); op = EPOLL_CTL_DEL; ee.events = 0; ee.data.ptr = NULL; /* 调用epoll_ctl方法将描述符从epoll对象中删除 */ if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } /* 设置描述符读写事件的active标志位 */ c->read->active = 0; c->write->active = 0; return NGX_OK; }參考資料:《深入理解Nginx》《模組ngx_epoll_module詳解》《 nginx epoll詳解》《Nginx原始碼分析-Epoll模組》《關於ngx_epoll_add_event的一些解釋》
以上就介紹了Nginx 的 epoll 事件驅動模組,包括了方面的內容,希望對PHP教程有興趣的朋友有所幫助。