ngx_event_core_module 모듈의 ngx_event_process_init 메서드는 이벤트 모듈의 일부 초기화를 수행합니다. 여기에는 "연결 요청"과 같은 읽기 이벤트에 해당하는 핸들러를 ngx_event_accept 함수에 설정하고 이 이벤트를 epoll 모듈에 추가하는 것이 포함됩니다. 새로운 연결 이벤트가 발생하면 ngx_event_accept가 호출됩니다. 일반적인 프로세스는 다음과 같습니다. 작업자 프로세스는 이벤트를 처리하기 위해 ngx_worker_process_cycle 메서드에서 ngx_process_events_and_timers 함수를 지속적으로 호출합니다. 이 함수는 이벤트 처리를 위한 일반적인 진입점입니다.
ngx_process_events_and_timers는 ngx_event_actions.process_events와 동일한 매크로인 ngx_process_events를 호출합니다. ngx_event_actions는 이벤트 기반 모듈(여기서는 epoll 모듈)에 해당하는 10개의 함수 인터페이스를 저장하는 전역 구조입니다. 그래서 여기서는 ngx_epoll_module_ctx.actions.process_events 함수가 호출되는데, 이는 이벤트를 처리하는 ngx_epoll_process_events 함수입니다.
ngx_epoll_process_events는 Linux 함수 인터페이스 epoll_wait를 호출하여 "새 연결" 이벤트를 얻은 다음 이 이벤트의 핸들러 처리 함수를 호출하여 이 이벤트를 처리합니다.
위에서 언급한 것처럼 ngx_event_accept 함수에 핸들러가 설정되어 있으므로 실제 처리를 위해서는 ngx_event_accept가 호출됩니다.
ngx_event_accept 메소드는 아래에서 분석됩니다.
간단한 코드는 다음과 같습니다. 댓글의 일련번호는 위 그림의 일련번호에 해당합니다.
void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; u_char sa[ngx_sockaddrlen]; if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (ngx_event_flags & ngx_use_rtsig_event) { ev->available = 1; } else if (!(ngx_event_flags & ngx_use_kqueue_event)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; do { socklen = ngx_sockaddrlen; /* 1、accept方法试图建立连接,非阻塞调用 */ s = accept(lc->fd, (struct sockaddr *) sa, &socklen); if (s == (ngx_socket_t) -1) { err = ngx_socket_errno; if (err == ngx_eagain) { /* 没有连接,直接返回 */ return; } level = ngx_log_alert; if (err == ngx_econnaborted) { level = ngx_log_err; } else if (err == ngx_emfile || err == ngx_enfile) { level = ngx_log_crit; } if (err == ngx_econnaborted) { if (ngx_event_flags & ngx_use_kqueue_event) { ev->available--; } if (ev->available) { continue; } } if (err == ngx_emfile || err == ngx_enfile) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } /* 2、设置负载均衡阈值 */ ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; /* 3、从连接池获得一个连接对象 */ c = ngx_get_connection(s, ev->log); /* 4、为连接创建内存池 */ c->pool = ngx_create_pool(ls->pool_size, ev->log); c->sockaddr = ngx_palloc(c->pool, socklen); ngx_memcpy(c->sockaddr, sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); /* set a blocking mode for aio and non-blocking mode for others */ /* 5、设置套接字属性为阻塞或非阻塞 */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & ngx_use_aio_event) { if (ngx_blocking(s) == -1) { ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) { if (ngx_nonblocking(s) == -1) { ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; c->unexpected_eof = 1; rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) { /* rtsig, aio, iocp */ rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; } rev->log = log; wev->log = log; /* * todo: mt: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * todo: mp: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == null) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } /* 6、将新连接对应的读写事件添加到epoll对象中 */ if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) { if (ngx_add_conn(c) == ngx_error) { ngx_close_accepted_connection(c); return; } } log->data = null; log->handler = null; /* 7、tcp建立成功调用的方法,这个方法在ngx_listening_t结构体中 */ ls->handler(c); } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */ }" nginx
의 Thundering Herd" 문제 nginx는 일반적으로 여러 작업자 프로세스를 실행하며 이러한 프로세스는 동시에 동일한 포트를 수신합니다. 새로운 연결이 도착하면 커널은 이러한 모든 프로세스를 깨우지만 단 하나의 프로세스만 클라이언트에 성공적으로 연결할 수 있어 깨어날 때 다른 프로세스가 많은 오버헤드를 낭비하게 되는 현상을 "천둥 떼" 현상이라고 합니다. nginx가 "충격" 문제를 해결하는 방법은 프로세스가 뮤텍스 잠금 ngx_accept_mutex를 획득하고 프로세스가 특정 임계 섹션에 상호 진입하도록 하는 것입니다. 임계 섹션에서 프로세스는 모니터링하려는 연결에 해당하는 읽기 이벤트를 epoll 모듈에 추가하므로 "새 연결" 이벤트가 발생하면 작업자 프로세스가 응답합니다. 이벤트 잠금 및 추가 프로세스는 ngx_trylock_accept_mutex 함수에서 완료됩니다. 다른 프로세스도 이 함수에 들어가서 읽기 이벤트를 추가하려고 하면 다른 프로세스가 뮤텍스를 보유하고 있어 반환만 가능하고, 수신하는 이벤트는 epoll 모듈에 추가할 수 없으므로 응답할 수 없다는 것을 알게 됩니다. "새 연결" "이벤트. 그러나 이것은 질문을 제기할 것입니다. 뮤텍스 잠금을 보유하고 있는 프로세스가 언제 뮤텍스 잠금을 해제합니까? 잠금을 해제하기 전에 모든 이벤트가 처리될 때까지 기다려야 한다면 시간이 오래 걸립니다. 이 기간 동안 다른 작업자 프로세스는 새로운 연결을 설정할 수 없으며 이는 분명히 바람직하지 않습니다. nginx에 대한 해결책은 다음과 같습니다. ngx_trylock_accept_mutex를 통해 뮤텍스 잠금을 획득한 프로세스는 준비된 읽기/쓰기 이벤트를 획득하고 epoll_wait에서 반환된 후 다음 이벤트를 대기열에 넣습니다.
새 연결 이벤트가 ngx_posted_accept_events 대기열에 저장됩니다
거기에 ngx_posted_events 대기열에 들어가는 연결 이벤트입니다.
코드는 다음과 같습니다.
if (flags & ngx_post_events) { /* 延后处理这批事件 */ queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); /* 将事件添加到延后执行队列中 */ ngx_locked_post_event(rev, queue); } else { rev->handler(rev); /* 不需要延后,则立即处理事件 */ }
유사한 처리를 위해 이벤트를 작성합니다. 다음으로 프로세스는 ngx_posted_accept_events 큐의 이벤트를 처리하고 처리 후 즉시 뮤텍스 잠금을 해제하므로 프로세스가 잠금을 차지하는 시간이 최소화됩니다.
nginx의 로드 밸런싱 문제nginx의 모든 프로세스는 로드 밸런싱을 처리하기 위해 임계값 ngx_accept_disabled를 사용하며, 이는 위 그림의 2단계에서 초기화됩니다.
ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle-> ;free_connection_n;
초기값은 음수이며, 음수의 절대값은 총 연결 수의 7/8과 같습니다. 임계값이 0보다 작을 경우 새로운 연결 이벤트에 정상적으로 응답합니다. , 임계값이 0보다 크면 더 이상 응답하지 않습니다. 새로운 연결 이벤트가 발생하고 ngx_accept_disabled가 1씩 감소하면 코드는 다음과 같습니다.
if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { if (ngx_trylock_accept_mutex(cycle) == ngx_error) { return; } .... }
이는 프로세스의 현재 연결 수가 7/에 도달할 때를 보여줍니다. 처리할 수 있는 총 연결 수 중 8개 이상이면 로드 밸런싱 메커니즘이 트리거되고 프로세스가 새 연결에 대한 응답을 중지합니다.
위 내용은 Nginx 이벤트 중심 프레임워크의 처리 흐름은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!