ホームページ >運用・保守 >Nginx >Nginxのイベントドリブンフレームワークの処理フローはどのようなものですか?

Nginxのイベントドリブンフレームワークの処理フローはどのようなものですか?

WBOY
WBOY転載
2023-05-13 13:34:06978ブラウズ

ngx_event_core_module モジュールの ngx_event_process_init メソッドは、イベント モジュールの初期化を行います。これには、「接続要求」などの読み取りイベントに対応するハンドラーを ngx_event_accept 関数に設定し、このイベントを epoll モジュールに追加することが含まれます。新しい接続イベントが発生すると、ngx_event_accept が呼び出されます。

ワーカー プロセスは、イベントを処理するために ngx_worker_process_cycle メソッドの ngx_process_events_and_timers 関数を継続的に呼び出します。この関数は、イベント処理の一般的なエントリ ポイントです。

ngx_process_events_and_timers は、ngx_event_actions.process_events に相当するマクロである ngx_process_events を呼び出します。ngx_event_actions は、イベント駆動型モジュール (ここでは epoll モジュール) に対応する 10 個の関数インターフェイスを格納するグローバル構造です。したがって、ここでは ngx_epoll_module_ctx.actions.process_events 関数が呼び出されます。これは、イベントを処理するための ngx_epoll_process_events 関数です。

ngx_epoll_process_events は、Linux 関数インターフェイス epoll_wait を呼び出して「新しい接続」イベントを取得し、このイベントのハンドラー処理関数を呼び出してこのイベントを処理します。

上記の通り、ハンドラには ngx_event_accept 関数が設定されているため、実際の処理では ngx_event_accept が呼び出されます。

ngx_event_accept メソッドを分析したフローチャートは次のとおりです: Nginxのイベントドリブンフレームワークの処理フローはどのようなものですか?

簡略化したコードは次のとおりです コメント内のシリアル番号は上図のシリアル番号:

void
ngx_event_accept(ngx_event_t *ev)
{
 socklen_t  socklen;
 ngx_err_t  err;
 ngx_log_t  *log;
 ngx_uint_t  level;
 ngx_socket_t  s;
 ngx_event_t  *rev, *wev;
 ngx_listening_t  *ls;
 ngx_connection_t *c, *lc;
 ngx_event_conf_t *ecf;
 u_char  sa[ngx_sockaddrlen];
 
 if (ev->timedout) {
  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) {
   return;
  }
 
  ev->timedout = 0;
 }
 
 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
 
 if (ngx_event_flags & ngx_use_rtsig_event) {
  ev->available = 1;
 
 } else if (!(ngx_event_flags & ngx_use_kqueue_event)) {
  ev->available = ecf->multi_accept;
 }
 
 lc = ev->data;
 ls = lc->listening;
 ev->ready = 0;
 
 do {
  socklen = ngx_sockaddrlen;
 
  /* 1、accept方法试图建立连接,非阻塞调用 */
  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
 
  if (s == (ngx_socket_t) -1)
  {
   err = ngx_socket_errno;
 
   if (err == ngx_eagain)
   {
    /* 没有连接,直接返回 */
    return;
   }
 
   level = ngx_log_alert;
 
   if (err == ngx_econnaborted) {
    level = ngx_log_err;
 
   } else if (err == ngx_emfile || err == ngx_enfile) {
    level = ngx_log_crit;
   }
 
   if (err == ngx_econnaborted) {
    if (ngx_event_flags & ngx_use_kqueue_event) {
     ev->available--;
    }
 
    if (ev->available) {
     continue;
    }
   }
 
   if (err == ngx_emfile || err == ngx_enfile) {
    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
     != ngx_ok)
    {
     return;
    }
 
    if (ngx_use_accept_mutex) {
     if (ngx_accept_mutex_held) {
      ngx_shmtx_unlock(&ngx_accept_mutex);
      ngx_accept_mutex_held = 0;
     }
 
     ngx_accept_disabled = 1;
 
    } else {
     ngx_add_timer(ev, ecf->accept_mutex_delay);
    }
   }
 
   return;
  }
 
  /* 2、设置负载均衡阈值 */
  ngx_accept_disabled = ngx_cycle->connection_n / 8
        - ngx_cycle->free_connection_n;
 
  /* 3、从连接池获得一个连接对象 */
  c = ngx_get_connection(s, ev->log);
 
  /* 4、为连接创建内存池 */
  c->pool = ngx_create_pool(ls->pool_size, ev->log);
 
  c->sockaddr = ngx_palloc(c->pool, socklen);
 
  ngx_memcpy(c->sockaddr, sa, socklen);
 
  log = ngx_palloc(c->pool, sizeof(ngx_log_t));
 
  /* set a blocking mode for aio and non-blocking mode for others */
  /* 5、设置套接字属性为阻塞或非阻塞 */
  if (ngx_inherited_nonblocking) {
   if (ngx_event_flags & ngx_use_aio_event) {
    if (ngx_blocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_blocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
 
  } else {
   if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) {
    if (ngx_nonblocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_nonblocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
  }
 
  *log = ls->log;
 
  c->recv = ngx_recv;
  c->send = ngx_send;
  c->recv_chain = ngx_recv_chain;
  c->send_chain = ngx_send_chain;
 
  c->log = log;
  c->pool->log = log;
 
  c->socklen = socklen;
  c->listening = ls;
  c->local_sockaddr = ls->sockaddr;
  c->local_socklen = ls->socklen;
 
  c->unexpected_eof = 1;
 
  rev = c->read;
  wev = c->write;
 
  wev->ready = 1;
 
  if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) {
   /* rtsig, aio, iocp */
   rev->ready = 1;
  }
 
  if (ev->deferred_accept) {
   rev->ready = 1;
 
  }
 
  rev->log = log;
  wev->log = log;
 
  /*
   * todo: mt: - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   *
   * todo: mp: - allocated in a shared memory
   *   - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   */
 
  c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
  if (ls->addr_ntop) {
   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
   if (c->addr_text.data == null) {
    ngx_close_accepted_connection(c);
    return;
   }
 
   c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
            c->addr_text.data,
            ls->addr_text_max_len, 0);
   if (c->addr_text.len == 0) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  /* 6、将新连接对应的读写事件添加到epoll对象中 */
  if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) {
   if (ngx_add_conn(c) == ngx_error) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  log->data = null;
  log->handler = null;
 
  /* 7、tcp建立成功调用的方法,这个方法在ngx_listening_t结构体中 */
  ls->handler(c);
 
 } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */
}

nginx

nginx の「ショック グループ」問題は通常、複数のワーカー プロセスを実行し、これらのプロセスは同じポートをリッスンします。同時に。新しい接続が到着すると、カーネルはこれらすべてのプロセスを起動しますが、クライアントに正常に接続できるのは 1 つのプロセスだけであり、起動時に他のプロセスが大量のオーバーヘッドを無駄にします。これは「サンダー ハード」現象と呼ばれます。 nginx が「ショック」問題を解決する方法は、プロセスにミューテックス ロック ngx_accept_mutex を取得させ、プロセスが特定のクリティカル セクションに相互に入るようにすることです。クリティカル セクションでは、プロセスは監視する接続に対応する読み取りイベントを epoll モジュールに追加します。これにより、「新しい接続」イベントが発生したときにワーカー プロセスが応答します。イベントをロックして追加するこのプロセスは、関数 ngx_trylock_accept_mutex で完了します。他のプロセスもこの関数に入り、読み取りイベントを追加したい場合、ミューテックスが別のプロセスによって保持されていることがわかるため、戻ることしかできず、リッスンしているイベントを epoll モジュールに追加できないため、応答できません。 「新しいつながり」イベント。しかし、これには疑問が生じます。ミューテックス ロックを保持しているプロセスはいつミューテックス ロックを解放するのでしょうか?ロックを解放する前にすべてのイベントが処理されるのを待つ必要がある場合、長い時間がかかります。この期間中、他のワーカー プロセスは新しい接続を確立できませんが、これは明らかに望ましくないことです。 nginx の解決策は次のとおりです。ngx_trylock_accept_mutex を通じてミューテックス ロックを取得したプロセスは、ready read/write イベントを取得して epoll_wait から戻った後、これらのイベントをキューに入れます。

新しい接続イベントは ngx_posted_accept_events に入れられます。 queue

既存の接続イベントを ngx_posted_events キューに入れます

コードは次のとおりです:

if (flags & ngx_post_events)
{
 /* 延后处理这批事件 */
 queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);
 
 /* 将事件添加到延后执行队列中 */
 ngx_locked_post_event(rev, queue);
}
else
{
 rev->handler(rev); /* 不需要延后,则立即处理事件 */
}

同様の処理のイベントを書き込みます。次に、プロセスは ngx_posted_accept_events キュー内のイベントを処理し、プロセスがロックを占有する時間を最小限に抑えるために、処理後にすぐにミューテックス ロックを解放します。

nginx の負荷分散の問題

nginx の各プロセスは、負荷分散を処理するためにしきい値 ngx_accept_disabled を使用します。これは、上図の手順 2 で初期化されます。

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

初期値は負の数であり、負の数の絶対値は合計の 7/8 に等しくなります。接続数。しきい値が 0 より小さい場合、新しい接続イベントに通常どおり応答します。しきい値が 0 より大きい場合、新しい接続イベントに応答しなくなり、ngx_accept_disabled が 1 ずつ減ります。コードは次のとおりです。

if (ngx_accept_disabled > 0)
{
  ngx_accept_disabled--;
}
else
{
 if (ngx_trylock_accept_mutex(cycle) == ngx_error)
 {
  return;
 }
 ....
}

これは、プロセスの現在の接続数が処理可能な総接続数の 7/8 に達すると、負荷分散メカニズムがトリガーされ、プロセスが新しい接続への応答を停止することを示しています。 ###

以上がNginxのイベントドリブンフレームワークの処理フローはどのようなものですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。