ngx_event_core_module模組的ngx_event_process_init方法對事件模組做了一些初始化。其中包括將「請求連接」這樣一個讀取事件對應的處理方法(handler)設為ngx_event_accept函數,並將此事件新增至epoll模組中。當有新連線事件發生時,ngx_event_accept就會被呼叫。大致流程是這樣:
worker進程在ngx_worker_process_cycle方法中不斷循環呼叫ngx_process_events_and_timers函數處理事件,這個函數是事件處理的總入口。
ngx_process_events_and_timers會呼叫ngx_process_events,這是一個宏,相當於ngx_event_actions.process_events,ngx_event_actions是個全域的結構體,儲存了對應事件驅動模組(這裡是epoll模組)的10個函數介面。所以這裡就是呼叫了ngx_epoll_module_ctx.actions.process_events函數,也就是ngx_epoll_process_events函式來處理事件。
ngx_epoll_process_events呼叫linux函式介面epoll_wait取得「有新連線」這個事件,然後呼叫這個事件的handler處理函式來處理這個事件。
在上面已經說過handler已經被設定成了ngx_event_accept函數,所以就呼叫ngx_event_accept進行實際的處理。
下面分析ngx_event_accept方法,它的流程圖如下所示:
#經過精簡的程式碼如下,註解中的序號對應上圖的序號:
void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; u_char sa[ngx_sockaddrlen]; if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (ngx_event_flags & ngx_use_rtsig_event) { ev->available = 1; } else if (!(ngx_event_flags & ngx_use_kqueue_event)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; do { socklen = ngx_sockaddrlen; /* 1、accept方法试图建立连接,非阻塞调用 */ s = accept(lc->fd, (struct sockaddr *) sa, &socklen); if (s == (ngx_socket_t) -1) { err = ngx_socket_errno; if (err == ngx_eagain) { /* 没有连接,直接返回 */ return; } level = ngx_log_alert; if (err == ngx_econnaborted) { level = ngx_log_err; } else if (err == ngx_emfile || err == ngx_enfile) { level = ngx_log_crit; } if (err == ngx_econnaborted) { if (ngx_event_flags & ngx_use_kqueue_event) { ev->available--; } if (ev->available) { continue; } } if (err == ngx_emfile || err == ngx_enfile) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } /* 2、设置负载均衡阈值 */ ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; /* 3、从连接池获得一个连接对象 */ c = ngx_get_connection(s, ev->log); /* 4、为连接创建内存池 */ c->pool = ngx_create_pool(ls->pool_size, ev->log); c->sockaddr = ngx_palloc(c->pool, socklen); ngx_memcpy(c->sockaddr, sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); /* set a blocking mode for aio and non-blocking mode for others */ /* 5、设置套接字属性为阻塞或非阻塞 */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & ngx_use_aio_event) { if (ngx_blocking(s) == -1) { ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) { if (ngx_nonblocking(s) == -1) { ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; c->unexpected_eof = 1; rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) { /* rtsig, aio, iocp */ rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; } rev->log = log; wev->log = log; /* * todo: mt: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * todo: mp: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == null) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } /* 6、将新连接对应的读写事件添加到epoll对象中 */ if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) { if (ngx_add_conn(c) == ngx_error) { ngx_close_accepted_connection(c); return; } } log->data = null; log->handler = null; /* 7、tcp建立成功调用的方法,这个方法在ngx_listening_t结构体中 */ ls->handler(c); } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */ }
nginx中的「驚群」問題
nginx一般會執行多個worker進程,這些進程會同時監聽相同連接埠。當有新連線到來時,核心將這些進程全部喚醒,但只有一個進程能夠和客戶端連線成功,導致其它進程在喚醒時浪費了大量開銷,這被稱為「驚群」現象。 nginx解決「驚群」的方法是,讓進程獲得互斥鎖ngx_accept_mutex,讓進程互斥地進入某一段臨界區。在這個臨界區中,進程將它所要監聽的連接對應的讀取事件加入到epoll模組中,使得當有「新連線」事件發生時,該worker進程會作出反應。這段加鎖並加入事件的過程是在函數ngx_trylock_accept_mutex中完成的。而當其它進程也進入該函數想要添加讀取事件時,發現互斥鎖被另外一個進程持有,所以它只能返回,它所監聽的事件也無法添加到epoll模組,從而無法響應「新連接”事件。但這會出現一個問題:持有互斥鎖的那個進程在什麼時候釋放互斥鎖呢?如果需要等待它處理完所有的事件才釋放鎖的話,那麼會需要相當長的時間。而在這段時間內,其它worker進程無法建立新連接,這顯然是不可取的。 nginx的解決方法是:透過ngx_trylock_accept_mutex獲得了互斥鎖的進程,在獲得就緒讀取/寫入事件並從epoll_wait返回後,將這些事件歸類放入佇列中:
新連接事件放入ngx_posted_accept_events佇列
已有連線事件放入ngx_posted_events佇列
程式碼如下:
if (flags & ngx_post_events) { /* 延后处理这批事件 */ queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); /* 将事件添加到延后执行队列中 */ ngx_locked_post_event(rev, queue); } else { rev->handler(rev); /* 不需要延后,则立即处理事件 */ }
寫事件做類似處理。進程接下來處理ngx_posted_accept_events佇列中的事件,處理完後立即釋放互斥鎖,使該進程佔用鎖的時間降到了最低。
nginx中的負載平衡問題
nginx中每個進程使用了一個處理負載平衡的閾值ngx_accept_disabled,它在上圖的第2步中被初始化:
ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;
它的初始值為負數,此負數的絕對值等於總連接數的7/8 .當閾值小於0時正常響應新連接事件,當閾值大於0時不再響應新連接事件,並將ngx_accept_disabled減1,代碼如下:
if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { if (ngx_trylock_accept_mutex(cycle) == ngx_error) { return; } .... }
這說明,當某個進程當前的連接當數達到能夠處理的總連接數的7/8時,負載平衡機制被觸發,進程停止回應新連線。
以上是Nginx事件驅動框架處理流程是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!