Heim  >  Artikel  >  Betrieb und Instandhaltung  >  Detaillierte Erläuterung des ereignisgesteuerten Nginx-Prozesses basierend auf dem Epoll-Modell

Detaillierte Erläuterung des ereignisgesteuerten Nginx-Prozesses basierend auf dem Epoll-Modell

藏色散人
藏色散人nach vorne
2020-02-01 15:04:475118Durchsuche

In diesem Artikel wird zunächst das Implementierungsprinzip des Epoll-Modells erläutert und anschließend auf Quellcodeebene erläutert, wie Nginx das ereignisgesteuerte Modell basierend auf dem Epoll-Modell implementiert.

Detaillierte Erläuterung des ereignisgesteuerten Nginx-Prozesses basierend auf dem Epoll-Modell

epoll ist ein ereignisgesteuertes Modell, was einer der wichtigen Gründe dafür ist, dass Nginx Kundenanfragen effizient bearbeiten kann. Aus Prozesssicht ist die Verwendung des Epoll-Modells hauptsächlich in drei Schritte unterteilt: die Erstellung des Epoll-Handles, das Hinzufügen von Listening-Dateideskriptoren und das Auslösen von Warteereignissen. In diesem Artikel wird vorgestellt, wie Nginx eine effiziente Verarbeitung von Client-Anfragen implementiert basierend auf diesen drei Schritten.

Verwandte Empfehlungen: „Nginx-Tutorial

1. Einführung in das Epoll-Modell

Vor der Einführung des Implementierungsprinzips von Nginx , Wir müssen zunächst die grundlegende Verwendung des Epoll-Modells vorstellen. Es gibt drei Hauptmethoden für die Verwendung von Epoll:

// 创建epoll句柄
int epoll_create(int size);
// 往epoll句柄中添加需要进行监听的文件描述符
int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event);
// 等待需要监听的文件描述符上对应的事件的发生
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);

Zuerst rufen wir die Methode epoll_create() auf, um ein Handle für die Epoll-Instanz zu erstellen. Das Handle kann hier als Instanz einer Eventpoll-Struktur verstanden werden Es gibt einen rot-schwarzen Baum und eine Warteschlange. Der rot-schwarze Baum speichert hauptsächlich die Dateideskriptoren, die überwacht werden müssen, und die Warteschlange fügt diese Ereignisse der Warteschlange hinzu, wenn bestimmte Ereignisse in den überwachten Dateideskriptoren auftreten ist ein schematisches Diagramm von Eventpoll:

Detaillierte Erläuterung des ereignisgesteuerten Nginx-Prozesses basierend auf dem Epoll-Modell

Im Allgemeinen gibt es im gesamten Laufzyklus des Programms nur ein Epoll-Handle, z. B. für jeden Worker-Prozess von Nginx Nur eines Das Epoll-Handle bleibt erhalten. Nach dem Erstellen des Handles ist jeder Port, den unser Programm abhört, im Wesentlichen ein Dateideskriptor. Das Accept-Ereignis kann auf diesem Dateideskriptor auftreten, das heißt, eine Client-Anfrage wird empfangen.

Daher fügen wir zunächst den Dateideskriptor, der dem zu überwachenden Port entspricht, über die Methode epoll_ctl() zum Epoll-Handle hinzu. Nach erfolgreicher Hinzufügung entspricht jeder Listening-Dateideskriptor einem Knoten im rot-schwarzen Baum von Eventpoll.

Außerdem wird nach dem Aufruf der Methode epoll_ctl() zum Hinzufügen eines Dateideskriptors dieser dem entsprechenden Gerät (Netzwerkkarte) zugeordnet. Wenn im Gerätetreiber ein Ereignis auftritt, wird der aktuelle Dateideskriptor verwendet zurückgerufen Die Rückrufmethode ep_poll_callback() generiert ein Ereignis und fügt das Ereignis zur Eventpoll-Ereigniswarteschlange hinzu.

Wenn wir schließlich die Methode epoll_wait() aufrufen, erhalten wir das entsprechende Ereignis vom Epoll-Handle. Im Wesentlichen prüfen wir, ob die Ereigniswarteschlange von Eventpoll leer ist, und geben es zurück. andernfalls wird auf das Eintreten des Ereignisses gewartet.

Für die Verwendung von Epoll ist das hier erhaltene Ereignis im Allgemeinen das Accept-Ereignis. Bei der Verarbeitung dieses Ereignisses wird das Verbindungshandle des Clients abgerufen. Dies ist im Wesentlichen ein Dateideskriptor , werden wir es weiterhin über die Methode epoll_ctl() zum aktuellen Epoll-Handle hinzufügen, um weiterhin auf seine Datenlese- und Schreibereignisse über die Methode epoll_wait() zu warten.

Hier können wir erkennen, dass es bei der Verwendung von epoll zwei Arten von Dateideskriptoren gibt, die dem von uns überwachten Port entsprechen Das Ereignis besteht darin, auf die Client-Verbindung zu warten, und der andere Typ ist ein Dateideskriptor, der jeder Client-Verbindung entspricht. Hier hören wir im Allgemeinen auf seine Lese- und Schreibereignisse, um Daten zu empfangen und an den Client zu senden.

2. Wie Epoll in Nginx implementiert wird

Im vorherigen Artikel haben wir erklärt, wie Nginx das ereignisgesteuerte Framework initialisiert, und eines der Ereignis-Frameworks wurde erwähnt Die Definition des Kernmoduls lautet wie folgt:

ngx_module_t ngx_event_core_module = {
    NGX_MODULE_V1,
    &ngx_event_core_module_ctx,            /* module context */
    ngx_event_core_commands,               /* module directives */
    NGX_EVENT_MODULE,                      /* module type */
    NULL,                                  /* init master */
    // 该方法主要是在master进程启动的过程中调用的,用于初始化时间模块
    ngx_event_module_init,                 /* init module */
    // 该方法是在各个worker进程启动之后调用的
    ngx_event_process_init,                /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

Hier müssen wir der Methode ngx_event_process_init() besondere Aufmerksamkeit schenken. Wie bereits erwähnt, wird diese Methode initialisiert und aufgerufen, wenn jeder Worker erstellt wird, an dem zwei beteiligt sind Sehr wichtige Aufrufe: a. Initialisieren Sie das entsprechende Ereignismodell. b. Das Folgende ist der Hauptcode dieser beiden Schritte:

static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) {
  // 省略部分代码....
  
  // 在nginx.conf配置文件的events{}配置块中需要使用use指令指定当前使用的事件模型,
  // 此时就会将所使用的事件模型的索引号存储在ecf->use中,下面的代码就是通过这种方式获取当前
  // 所指定的事件模型所对应的模块的,然后调用该模块的actions.init()方法初始化该事件模型
  for (m = 0; cycle->modules[m]; m++) {
    if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
      continue;
    }
    // ecf->use存储了所选用的事件模型的模块序号,这里是找到该模块
    if (cycle->modules[m]->ctx_index != ecf->use) {
      continue;
    }
    // module即为所选用的事件模型对应的模块
    module = cycle->modules[m]->ctx;
    // 调用指定事件模型的初始化方法
    if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
      exit(2);
    }
    break;
  }
  // 省略部分代码...
  
  ls = cycle->listening.elts;
  for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
    if (ls[i].reuseport && ls[i].worker != ngx_worker) {
      continue;
    }
#endif
    // 这里是为当前所监听的每一个端口都绑定一个ngx_connection_t结构体
    c = ngx_get_connection(ls[i].fd, cycle->log);
    if (c == NULL) {
      return NGX_ERROR;
    }
    rev = c->read;
    // SOCK_STREAM表示TCP,一般都是TCP,也就是说在接收到客户端的accept事件之后,
    // 就会调用ngx_event_accept()方法处理该事件
    rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg;
    if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) {
        if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) {
            return NGX_ERROR;
        }
        continue;
    }
  }
  return NGX_OK;
}

Der Code hier vervollständigt hauptsächlich die folgenden Teile:

Suchen Sie zuerst das verwendete Ereignismodellmodul und rufen Sie dann seine init( )-Methode auf Initialisieren Sie das Modell. Zum einen wird über die Methode epoll_create() ein Epoll-Handle erstellt, zum anderen wird der globalen Variablen ngx_event_actions ein Wert zugewiesen. Das heißt:

// 这里将epoll相关的事件操作方法赋值给ngx_event_actions,
// 也就是说后续有相关的事件发生则都会使用epoll相关的方法
ngx_event_actions = ngx_epoll_module_ctx.actions;

Der Aufruf dieser Zuweisung ist sehr wichtig. Nach der Zuweisung sind mehrere von Nginx definierte Methoden alle im Epoll-Modul angegebenen Methoden:

#define ngx_process_events   ngx_event_actions.process_events
#define ngx_done_events      ngx_event_actions.done
#define ngx_add_event        ngx_event_actions.add
#define ngx_del_event        ngx_event_actions.del
#define ngx_add_conn         ngx_event_actions.add_conn
#define ngx_del_conn         ngx_event_actions.del_conn

Die Definition der Struktur ngx_epoll_module_ctx.actions lautet hier wie folgt:

{
  // 对应于ngx_event_actions_t中的add方法
  ngx_epoll_add_event,             /* add an event */
  // 对应于ngx_event_actions_t中的del方法
  ngx_epoll_del_event,             /* delete an event */
  // 对应于ngx_event_actions_t中的enable方法,与add方法一致
  ngx_epoll_add_event,             /* enable an event */
  // 对应于ngx_event_actions_t中的disable方法,与del方法一致
  ngx_epoll_del_event,             /* disable an event */
  // 对应于ngx_event_actions_t中的add_conn方法
  ngx_epoll_add_connection,        /* add an connection */
  // 对应于ngx_event_actions_t中的del_conn方法
  ngx_epoll_del_connection,        /* delete an connection */
  #if (NGX_HAVE_EVENTFD)
  ngx_epoll_notify,                /* trigger a notify */
  #else
  NULL,                            /* trigger a notify */
  #endif
  // 对应于ngx_event_actions_t中的process_events方法
  ngx_epoll_process_events,        /* process the events */
  // 对应于ngx_event_actions_t中的init方法
  ngx_epoll_init,                  /* init the events */
  // 对应于ngx_event_actions_t中的done方法
  ngx_epoll_done,                  /* done the events */
}

Daraus können wir die hervorragende Entwurfsmethode von Nginx erkennen. Durch das von uns ausgewählte Ereignismodell können Sie das dynamisch angeben implementierte Submodule für Makros wie ngx_add_event().

上面的方法完成的第二个主要的工作就是遍历所有监听的端口,获取其描述符,然后通过ngx_add_event()方法将其添加到epoll句柄中以监听其客户端连接事件。从这里就可以感觉到比较巧妙了,因为上面一步中正好对epoll模块进行了初始化,并且设置了ngx_add_event()宏的实现方法,而这里就使用到了这里设置的方法,该方法本质上就是通过epoll_ctl()方法将当前监听的socket描述符添加到epoll句柄中;

最后就是上面的方法在遍历所有监听的端口的时候,为每个连接的accept事件添加的回调方法是ngx_event_accept(),通过前面我们对epoll模型的使用方式的介绍,我们大概可以理解,这里的ngx_event_accept()方法的主要作用是将当前accept到的客户端连接的句柄通过epoll_ctl()方法添加到当前epoll句柄中,以继续监听其读写事件;

这里我们首先看一下上面第一点中介绍的module->actions.init(cycle, ngx_timer_resolution)方法调用时是如何初始化epoll模块的。由于是epoll模块,这里的init()方法指向的就是ngx_epoll_init()方法,如下是该方法的源码:

static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) {
  ngx_epoll_conf_t *epcf;
  // 获取解析得到的ngx_epoll_conf_t结构体
  epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
  if (ep == -1) {
    // 创建eventpoll结构体,将创建得到的文件描述符返回
    ep = epoll_create(cycle->connection_n / 2);
    // ep==-1表示创建失败
    if (ep == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "epoll_create() failed");
      return NGX_ERROR;
    }
  }
  // 如果nevents小于epcf->events,说明event_list数组的长度不够,因而需要重新申请内存空间
  if (nevents < epcf->events) {
    if (event_list) {
      ngx_free(event_list);
    }
    // 为event_list重新申请内存空间
    event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log);
    if (event_list == NULL) {
      return NGX_ERROR;
    }
  }
  // 将nevents更新为配置文件中指定的大小
  nevents = epcf->events;
  ngx_io = ngx_os_io;
  // 这里将epoll相关的事件操作方法赋值给ngx_event_actions,也就是说后续有相关的事件发生则
  // 都会使用epoll相关的方法
  ngx_event_actions = ngx_epoll_module_ctx.actions;
  // 这里NGX_USE_CLEAR_EVENT指的是使用ET模式来使用epoll,默认使用ET模式,
  // 而NGX_USE_LEVEL_EVENT表示使用LE模式来使用epoll
#if (NGX_HAVE_CLEAR_EVENT)
  ngx_event_flags = NGX_USE_CLEAR_EVENT
                    #else
                    ngx_event_flags = NGX_USE_LEVEL_EVENT
                    #endif
                        // NGX_USE_GREEDY_EVENT表示每次拉取事件是都尝试拉取最多的事件
                    | NGX_USE_GREEDY_EVENT
                    | NGX_USE_EPOLL_EVENT;
  return NGX_OK;
}

可以看到,这里的ngx_epoll_init()方法主要的作用有两个:a. 通过epoll_create()方法创建一个epoll句柄;b. 设置ngx_event_actions属性所指向的方法的实现,从而确定ngx_add_event()等宏的实现方法。下面我们来看一下ngx_add_event()是如何将需要监听的文件描述符添加到epoll句柄中的:

static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) {
  int op;
  uint32_t events, prev;
  ngx_event_t *e;
  ngx_connection_t *c;
  struct epoll_event ee;
  // ev->data在使用的过程中存储的是当前对应的ngx_connection_t,如果是free_connection,
  // 则存储的是下一个节点的指针
  c = ev->data;
  // 事件类型
  events = (uint32_t) event;
  // 如果是读事件
  if (event == NGX_READ_EVENT) {
    e = c->write;
    prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP)
    events = EPOLLIN | EPOLLRDHUP;  // 设置读事件类型
#endif
  } else {
    e = c->read;
    prev = EPOLLIN | EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
    events = EPOLLOUT;  // 设置写事件类型
#endif
  }
  // 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件
  if (e->active) {
    op = EPOLL_CTL_MOD; // 类型为修改事件
    events |= prev;
  } else {
    op = EPOLL_CTL_ADD; // 类型为添加事件
  }
#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
  if (flags & NGX_EXCLUSIVE_EVENT) {
      events &= ~EPOLLRDHUP;
  }
#endif
  // 将flags参数指定的事件添加到监听列表中
  ee.events = events | (uint32_t) flags;
  // 这里是将connection指针的最后一位赋值为ev->instance,然后将其赋值给事件的ptr属性,通过这种方式检测事件是否过期
  ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
  ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                 "epoll add event: fd:%d op:%d ev:%08XD",
                 c->fd, op, ee.events);
  // 将事件添加到epoll句柄中
  if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                  "epoll_ctl(%d, %d) failed", op, c->fd);
    return NGX_ERROR;
  }
  // 将事件标记为活跃状态
  ev->active = 1;
#if 0
  ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
  return NGX_OK;
}

这里的ngx_add_event()方法本质上是比较简单的,就是将当前的ngx_event_t转换为一个epoll_event结构体,并且会设置该结构体中需要监听的事件类型,然后通过epoll_ctl()方法将当前epoll_event添加到epoll句柄中。

在前面的ngx_event_process_init()方法中,nginx通过ngx_add_event()方法将各个监听的端口的描述符添加到epoll句柄中之后,就会开始监听这些描述符上的accept连接事件,如果有客户端连接请求,此时就会回调ngx_event_accept()方法处理该请求,我们来看一下该方法是如何处理客户端建立连接的请求的:

/**
 * 当客户端有accept事件到达时,将调用此方法处理该事件
 */
void ngx_event_accept(ngx_event_t *ev) {
  socklen_t socklen;
  ngx_err_t err;
  ngx_log_t *log;
  ngx_uint_t level;
  ngx_socket_t s;
  ngx_event_t *rev, *wev;
  ngx_sockaddr_t sa;
  ngx_listening_t *ls;
  ngx_connection_t *c, *lc;
  ngx_event_conf_t *ecf;
#if (NGX_HAVE_ACCEPT4)
  static ngx_uint_t  use_accept4 = 1;
#endif
  if (ev->timedout) {
    // 如果当前事件超时了,则继续将其添加到epoll句柄中以监听accept事件
    if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
      return;
    }
    ev->timedout = 0;
  }
  // 获取解析event核心配置结构体
  ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
  if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
    ev->available = ecf->multi_accept;
  }
  lc = ev->data;
  ls = lc->listening;
  ev->ready = 0;
  do {
    socklen = sizeof(ngx_sockaddr_t);
#if (NGX_HAVE_ACCEPT4)
    if (use_accept4) {
        s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
    } else {
        s = accept(lc->fd, &sa.sockaddr, &socklen);
    }
#else
    // 这里lc->fd指向的是监听的文件句柄,调用accept()获取客户端的连接,并且将其存储到sa.sockaddr中
    s = accept(lc->fd, &sa.sockaddr, &socklen);
#endif
    // 检查当前进程获取的连接个数是否超过了最大可用连接数的7/8,是则不再继续接收连接
    ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;
    // 获取新的连接
    c = ngx_get_connection(s, ev->log);
    // 获取连接失败则直接返回
    if (c == NULL) {
      if (ngx_close_socket(s) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                      ngx_close_socket_n
                          " failed");
      }
      return;
    }
    // 标记当前为TCP连接
    c->type = SOCK_STREAM;
    // 为当前连接创建连接池
    c->pool = ngx_create_pool(ls->pool_size, ev->log);
    if (c->pool == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }
    // 更新socklen的长度
    if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
      socklen = sizeof(ngx_sockaddr_t);
    }
    // 为sockaddr申请内存空间,并且将客户端连接地址复制到c->sockaddr中
    c->sockaddr = ngx_palloc(c->pool, socklen);
    if (c->sockaddr == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }
    ngx_memcpy(c->sockaddr, &sa, socklen);
    // 申请ngx_log_t结构体的内存空间
    log = ngx_palloc(c->pool, sizeof(ngx_log_t));
    if (log == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }
    /* set a blocking mode for iocp and non-blocking mode for others */
    if (ngx_inherited_nonblocking) {
      if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
        // 将连接设置为阻塞模式
        if (ngx_blocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_blocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }
    } else {
      if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
        // 将连接设置为非阻塞模式
        if (ngx_nonblocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_nonblocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }
    }
    *log = ls->log;
    // 设置连接的基本属性
    c->recv = ngx_recv;
    c->send = ngx_send;
    c->recv_chain = ngx_recv_chain;
    c->send_chain = ngx_send_chain;
    c->log = log;
    c->pool->log = log;
    c->socklen = socklen;
    c->listening = ls;
    c->local_sockaddr = ls->sockaddr;
    c->local_socklen = ls->socklen;
#if (NGX_HAVE_UNIX_DOMAIN)
    if (c->sockaddr->sa_family == AF_UNIX) {
      c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
      c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
      /* Solaris&#39;s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
      c->sendfile = 0;
#endif
    }
#endif
    rev = c->read;
    wev = c->write;
    wev->ready = 1;
    if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
      rev->ready = 1;
    }
    if (ev->deferred_accept) {
      rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
      rev->available = 1;
#endif
    }
    rev->log = log;
    wev->log = log;
    // 更新连接使用次数
    c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
    // 将网络地址更新为字符串形式的地址
    if (ls->addr_ntop) {
      c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
      if (c->addr_text.data == NULL) {
        ngx_close_accepted_connection(c);
        return;
      }
      c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                       c->addr_text.data,
                                       ls->addr_text_max_len, 0);
      if (c->addr_text.len == 0) {
        ngx_close_accepted_connection(c);
        return;
      }
    }
#if (NGX_DEBUG)
    {
    ngx_str_t  addr;
    u_char     text[NGX_SOCKADDR_STRLEN];
    ngx_debug_accepted_connection(ecf, c);
    if (log->log_level & NGX_LOG_DEBUG_EVENT) {
        addr.data = text;
        addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
                                 NGX_SOCKADDR_STRLEN, 1);
        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
                       "*%uA accept: %V fd:%d", c->number, &addr, s);
    }
    }
#endif
    // 将当前连接添加到epoll句柄中进行监控
    if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
      if (ngx_add_conn(c) == NGX_ERROR) {
        ngx_close_accepted_connection(c);
        return;
      }
    }
    log->data = NULL;
    log->handler = NULL;
    // 建立新连接之后的回调方法
    ls->handler(c);
    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
      ev->available--;
    }
  } while (ev->available);
}

这里客户端连接的建立过程主要可以分为如下几个步骤:

首先调用accept()方法获取当前客户端建立的连接,并且将其地址信息保存到结构体sa中;

接着通过调用ngx_get_connection()方法获取一个ngx_connection_t结构体以对应当前获取到的客户端连接,并且会初始化该结构体的各个属性;

调用ngx_add_conn()方法将当前方法添加到epoll句柄中,这里的添加过程本质上就是通过epoll_ctl()方法将当前客户端的连接的文件描述符添加到epoll句柄中,以监听其读写事件;

如此我们就讲解了从epoll句柄的创建,到指定的端口的监听,接着处理客户端连接,并且将客户端连接对应的文件描述符继续添加到epoll句柄中以监听读写事件的流程。下面我们继续来看一下nginx是如何等待所监听的这些句柄上的事件的发生的,也即整个事件框架的驱动程序。worker进程对于事件的处理,主要在ngx_process_events_and_timers()方法中,如下是该方法的源码:

void ngx_process_events_and_timers(ngx_cycle_t *cycle) {
// 尝试获取共享锁
  if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
    return;
  }
  // 这里开始处理事件,对于kqueue模型,其指向的是ngx_kqueue_process_events()方法,
  // 而对于epoll模型,其指向的是ngx_epoll_process_events()方法
  // 这个方法的主要作用是,在对应的事件模型中获取事件列表,然后将事件添加到ngx_posted_accept_events
  // 队列或者ngx_posted_events队列中
  (void) ngx_process_events(cycle, timer, flags);
  // 这里开始处理accept事件,将其交由ngx_event_accept.c的ngx_event_accept()方法处理;
  ngx_event_process_posted(cycle, &ngx_posted_accept_events);
  // 开始释放锁
  if (ngx_accept_mutex_held) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
  }
  // 如果不需要在事件队列中进行处理,则直接处理该事件
  // 对于事件的处理,如果是accept事件,则将其交由ngx_event_accept.c的ngx_event_accept()方法处理;
  // 如果是读事件,则将其交由ngx_http_request.c的ngx_http_wait_request_handler()方法处理;
  // 对于处理完成的事件,最后会交由ngx_http_request.c的ngx_http_keepalive_handler()方法处理。
  // 这里开始处理除accept事件外的其他事件
  ngx_event_process_posted(cycle, &ngx_posted_events);
}

这里的ngx_process_events_and_timers()方法我们省略了大部分代码,只留下了主要的流程。简而言之,其主要实现了如下几个步骤的工作:

获取共享锁,以得到获取客户端连接的权限;

调用ngx_process_events()方法监听epoll句柄中各个文件描述符的事件,并且处理这些事件。在前面我们讲到,nginx在调用epoll模块的init()方法时,初始化了ngx_event_actions属性的值,将其指向了epoll模块所实现的方法,这里就包括ngx_process_events方法宏所对应的方法,也即ngx_epoll_process_events()方法,因而这里其实就可以理解,ngx_epoll_process_events()方法本质上就是调用epoll_wait()方法等待epoll句柄上所监听的事件的发生;

处理ngx_posted_accept_events队列中的事件,这些事件其实就是前面讲到的客户端建立连接的事件,在ngx_epoll_process_events()方法中获取到事件之后,会判断其是accept事件还是读写事件,如果是accept事件,就会将其添加到ngx_posted_accept_events队列中,如果是读写事件,就会将其添加到ngx_posted_events队列中;

释放共享锁,以让其他的worker进程可以获取锁,从而接收客户端连接;

处理ngx_posted_events队列中的事件,也即客户端连接的读写事件。从这里就可以看出nginx高性能的一个原因,其将accept事件和读写事件放到了两个不同的队列中,accept事件是必须在锁内部处理的,而读写事件则可以异步于accept事件,这提高了nginx处理客户端请求的能力。

下面我们来看一下ngx_epoll_process_events()方法是如何处理epoll句柄中的事件的:

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {
  int events;
  uint32_t revents;
  ngx_int_t instance, i;
  ngx_uint_t level;
  ngx_err_t err;
  ngx_event_t *rev, *wev;
  ngx_queue_t *queue;
  ngx_connection_t *c;
  /* NGX_TIMER_INFINITE == INFTIM */
  ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                 "epoll timer: %M", timer);
  // 通过epoll_wait()方法进行事件的获取,获取到的事件将存放在event_list中,并且会将获取的事件个数返回
  events = epoll_wait(ep, event_list, (int) nevents, timer);
  err = (events == -1) ? ngx_errno : 0;
  // 这里的ngx_event_timer_alarm是通过一个定时器任务来触发的,在定时器中会将其置为1,
  // 从而实现定期更新nginx缓存的时间的目的
  if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
    ngx_time_update();
  }
  if (err) {
    if (err == NGX_EINTR) {
      if (ngx_event_timer_alarm) {
        ngx_event_timer_alarm = 0;
        return NGX_OK;
      }
      level = NGX_LOG_INFO;
    } else {
      level = NGX_LOG_ALERT;
    }
    ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
    return NGX_ERROR;
  }
  // 获取的事件个数为0
  if (events == 0) {
    // 如果当前时间类型不为NGX_TIMER_INFINITE,说明获取事件超时了,则直接返回
    if (timer != NGX_TIMER_INFINITE) {
      return NGX_OK;
    }
    // 这里说明时间类型为NGX_TIMER_INFINITE,但是却返回了0个事件,说明epoll_wait()调用出现了问题
    ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                  "epoll_wait() returned no events without timeout");
    return NGX_ERROR;
  }
  // 遍历各个事件
  for (i = 0; i < events; i++) {
    // 每个事件的data.ptr中存储了当前事件对应的connection对象
    c = event_list[i].data.ptr;
    // 获取事件中存储的instance的值
    instance = (uintptr_t) c & 1;
    // 获取connection指针地址值
    c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
    // 获取读事件结构体
    rev = c->read;
    // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance,
    // 说明该连接已经过期了,则不对该事件进行处理
    if (c->fd == -1 || rev->instance != instance) {
      /*
       * the stale event from a file descriptor
       * that was just closed in this iteration
       */
      ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll: stale event %p", c);
      continue;
    }
    // 获取当前事件监听的类型
    revents = event_list[i].events;
    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll: fd:%d ev:%04XD d:%p",
                   c->fd, revents, event_list[i].data.ptr);
    // 如果事件发生错误,则打印相应的日志
    if (revents & (EPOLLERR | EPOLLHUP)) {
      ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll_wait() error on fd:%d ev:%04XD",
                     c->fd, revents);
      /*
       * if the error events were returned, add EPOLLIN and EPOLLOUT
       * to handle the events at least in one active handler
       */
      revents |= EPOLLIN | EPOLLOUT;
    }
#if 0
    if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "strange epoll_wait() events fd:%d ev:%04XD",
                      c->fd, revents);
    }
#endif
    // 如果当前是读事件,并且事件是活跃的
    if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
      if (revents & EPOLLRDHUP) {
          rev->pending_eof = 1;
      }
      rev->available = 1;
#endif
      // 将事件标记为就绪状态
      rev->ready = 1;
      // 默认是开启了NGX_POST_EVENTS开关的
      if (flags & NGX_POST_EVENTS) {
        // 如果当前是accept事件,则将其添加到ngx_posted_accept_events队列中,
        // 如果是读写事件,则将其添加到ngx_posted_events队列中
        queue = rev->accept ? &ngx_posted_accept_events
                            : &ngx_posted_events;
        ngx_post_event(rev, queue);
      } else {
        // 如果不需要分离accept和读写事件,则直接处理该事件
        rev->handler(rev);
      }
    }
    // 获取写事件结构体
    wev = c->write;
    if ((revents & EPOLLOUT) && wev->active) {
      // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance,
      // 说明该连接已经过期了,则不对该事件进行处理
      if (c->fd == -1 || wev->instance != instance) {
        /*
         * the stale event from a file descriptor
         * that was just closed in this iteration
         */
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: stale event %p", c);
        continue;
      }
      // 将当前事件标记为就绪状态
      wev->ready = 1;
#if (NGX_THREADS)
      wev->complete = 1;
#endif
      // 由于是写事件,并且需要标记为了NGX_POST_EVENTS状态,
      // 因而将其直接添加到ngx_posted_events队列中,否则直接处理该事件
      if (flags & NGX_POST_EVENTS) {
        ngx_post_event(wev, &ngx_posted_events);
      } else {
        wev->handler(wev);
      }
    }
  }
  return NGX_OK;
}

这里ngx_epoll_process_events()方法首先就是调用epoll_wait()方法获取所监听的句柄的事件,然后遍历获取的事件,根据事件的类型,如果是accept事件,则添加到ngx_posted_accept_events队列中,如果是读写事件,则添加到ngx_posted_events队列中,而队列中事件的处理,则在上面介绍的ngx_process_events_and_timers()方法中进行。

        

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung des ereignisgesteuerten Nginx-Prozesses basierend auf dem Epoll-Modell. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:oschina.net. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen