Home  >  Article  >  Operation and Maintenance  >  What is the processing flow of Nginx event-driven framework?

What is the processing flow of Nginx event-driven framework?

WBOY
WBOYforward
2023-05-13 13:34:06889browse

The ngx_event_process_init method of the ngx_event_core_module module does some initialization of the event module. This includes setting the handler corresponding to a read event such as "request connection" to the ngx_event_accept function, and adding this event to the epoll module. When a new connection event occurs, ngx_event_accept will be called. The general process is as follows:

The worker process continuously calls the ngx_process_events_and_timers function in the ngx_worker_process_cycle method to process events. This function is the general entry point for event processing.

ngx_process_events_and_timers will call ngx_process_events, which is a macro, equivalent to ngx_event_actions.process_events. ngx_event_actions is a global structure that stores 10 function interfaces corresponding to the event-driven module (here is the epoll module). So the ngx_epoll_module_ctx.actions.process_events function is called here, which is the ngx_epoll_process_events function to process events.

ngx_epoll_process_events calls the linux function interface epoll_wait to obtain the "new connection" event, and then calls the handler processing function of this event to process this event.

As mentioned above, the handler has been set to the ngx_event_accept function, so ngx_event_accept is called for actual processing.

The ngx_event_accept method is analyzed below. Its flow chart is as follows:

What is the processing flow of Nginx event-driven framework?

The simplified code is as follows. The serial number in the comment corresponds to the serial number in the above figure:

void
ngx_event_accept(ngx_event_t *ev)
{
 socklen_t  socklen;
 ngx_err_t  err;
 ngx_log_t  *log;
 ngx_uint_t  level;
 ngx_socket_t  s;
 ngx_event_t  *rev, *wev;
 ngx_listening_t  *ls;
 ngx_connection_t *c, *lc;
 ngx_event_conf_t *ecf;
 u_char  sa[ngx_sockaddrlen];
 
 if (ev->timedout) {
  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) {
   return;
  }
 
  ev->timedout = 0;
 }
 
 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
 
 if (ngx_event_flags & ngx_use_rtsig_event) {
  ev->available = 1;
 
 } else if (!(ngx_event_flags & ngx_use_kqueue_event)) {
  ev->available = ecf->multi_accept;
 }
 
 lc = ev->data;
 ls = lc->listening;
 ev->ready = 0;
 
 do {
  socklen = ngx_sockaddrlen;
 
  /* 1、accept方法试图建立连接,非阻塞调用 */
  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
 
  if (s == (ngx_socket_t) -1)
  {
   err = ngx_socket_errno;
 
   if (err == ngx_eagain)
   {
    /* 没有连接,直接返回 */
    return;
   }
 
   level = ngx_log_alert;
 
   if (err == ngx_econnaborted) {
    level = ngx_log_err;
 
   } else if (err == ngx_emfile || err == ngx_enfile) {
    level = ngx_log_crit;
   }
 
   if (err == ngx_econnaborted) {
    if (ngx_event_flags & ngx_use_kqueue_event) {
     ev->available--;
    }
 
    if (ev->available) {
     continue;
    }
   }
 
   if (err == ngx_emfile || err == ngx_enfile) {
    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
     != ngx_ok)
    {
     return;
    }
 
    if (ngx_use_accept_mutex) {
     if (ngx_accept_mutex_held) {
      ngx_shmtx_unlock(&ngx_accept_mutex);
      ngx_accept_mutex_held = 0;
     }
 
     ngx_accept_disabled = 1;
 
    } else {
     ngx_add_timer(ev, ecf->accept_mutex_delay);
    }
   }
 
   return;
  }
 
  /* 2、设置负载均衡阈值 */
  ngx_accept_disabled = ngx_cycle->connection_n / 8
        - ngx_cycle->free_connection_n;
 
  /* 3、从连接池获得一个连接对象 */
  c = ngx_get_connection(s, ev->log);
 
  /* 4、为连接创建内存池 */
  c->pool = ngx_create_pool(ls->pool_size, ev->log);
 
  c->sockaddr = ngx_palloc(c->pool, socklen);
 
  ngx_memcpy(c->sockaddr, sa, socklen);
 
  log = ngx_palloc(c->pool, sizeof(ngx_log_t));
 
  /* set a blocking mode for aio and non-blocking mode for others */
  /* 5、设置套接字属性为阻塞或非阻塞 */
  if (ngx_inherited_nonblocking) {
   if (ngx_event_flags & ngx_use_aio_event) {
    if (ngx_blocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_blocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
 
  } else {
   if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) {
    if (ngx_nonblocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_nonblocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
  }
 
  *log = ls->log;
 
  c->recv = ngx_recv;
  c->send = ngx_send;
  c->recv_chain = ngx_recv_chain;
  c->send_chain = ngx_send_chain;
 
  c->log = log;
  c->pool->log = log;
 
  c->socklen = socklen;
  c->listening = ls;
  c->local_sockaddr = ls->sockaddr;
  c->local_socklen = ls->socklen;
 
  c->unexpected_eof = 1;
 
  rev = c->read;
  wev = c->write;
 
  wev->ready = 1;
 
  if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) {
   /* rtsig, aio, iocp */
   rev->ready = 1;
  }
 
  if (ev->deferred_accept) {
   rev->ready = 1;
 
  }
 
  rev->log = log;
  wev->log = log;
 
  /*
   * todo: mt: - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   *
   * todo: mp: - allocated in a shared memory
   *   - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   */
 
  c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
  if (ls->addr_ntop) {
   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
   if (c->addr_text.data == null) {
    ngx_close_accepted_connection(c);
    return;
   }
 
   c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
            c->addr_text.data,
            ls->addr_text_max_len, 0);
   if (c->addr_text.len == 0) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  /* 6、将新连接对应的读写事件添加到epoll对象中 */
  if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) {
   if (ngx_add_conn(c) == ngx_error) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  log->data = null;
  log->handler = null;
 
  /* 7、tcp建立成功调用的方法,这个方法在ngx_listening_t结构体中 */
  ls->handler(c);
 
 } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */
}

The "shock group" problem in nginx

nginx generally runs multiple worker processes, and these processes listen to the same port at the same time. When a new connection arrives, the kernel wakes up all these processes, but only one process can successfully connect to the client, causing other processes to waste a lot of overhead when waking up. This is called the "thundering herd" phenomenon. The way nginx solves the "shock" problem is to let the process obtain the mutex lock ngx_accept_mutex and let the process enter a certain critical section mutually. In the critical section, the process adds the read event corresponding to the connection it wants to monitor to the epoll module, so that when a "new connection" event occurs, the worker process will respond. This process of locking and adding events is completed in the function ngx_trylock_accept_mutex. When other processes also enter this function and want to add read events, they find that the mutex is held by another process, so it can only return, and the events it listens to cannot be added to the epoll module, so it cannot respond to the "new connection" "event. But this will raise a question: when does the process holding the mutex lock release the mutex lock? If you need to wait for it to process all events before releasing the lock, it will take a long time. During this period, other worker processes cannot establish new connections, which is obviously undesirable. The solution for nginx is: the process that has obtained the mutex lock through ngx_trylock_accept_mutex, after getting the ready read/write events and returning from epoll_wait, puts these events into the queue:

New connection events are put in ngx_posted_accept_events queue
Put existing connection events into the ngx_posted_events queue

The code is as follows:

if (flags & ngx_post_events)
{
 /* 延后处理这批事件 */
 queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);
 
 /* 将事件添加到延后执行队列中 */
 ngx_locked_post_event(rev, queue);
}
else
{
 rev->handler(rev); /* 不需要延后,则立即处理事件 */
}

Write events for similar processing. The process next processes the events in the ngx_posted_accept_events queue, and immediately releases the mutex lock after processing, so that the time the process occupies the lock is minimized.

Load balancing issues in nginx

Each process in nginx uses a threshold ngx_accept_disabled to handle load balancing, which is initialized in step 2 of the above figure :

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

Its initial value is a negative number, and the absolute value of the negative number is equal to 7/8 of the total number of connections. .When the threshold is less than 0, it responds to new connection events normally. When the threshold is greater than 0, it no longer responds to new connection events, and decrements ngx_accept_disabled by 1. The code is as follows:

if (ngx_accept_disabled > 0)
{
  ngx_accept_disabled--;
}
else
{
 if (ngx_trylock_accept_mutex(cycle) == ngx_error)
 {
  return;
 }
 ....
}

This shows that when the current connection of a process When the number reaches 7/8 of the total number of connections that can be processed, the load balancing mechanism is triggered and the process stops responding to new connections.

The above is the detailed content of What is the processing flow of Nginx event-driven framework?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete