Rumah  >  Artikel  >  Operasi dan penyelenggaraan  >  Apakah aliran pemprosesan rangka kerja dipacu peristiwa Nginx?

Apakah aliran pemprosesan rangka kerja dipacu peristiwa Nginx?

WBOY
WBOYke hadapan
2023-05-13 13:34:06838semak imbas

Kaedah ngx_event_process_init modul ngx_event_core_module melakukan beberapa permulaan modul acara. Ini termasuk menetapkan pengendali yang sepadan dengan acara baca seperti "permintaan sambungan" ke fungsi ngx_event_accept dan menambah acara ini pada modul epoll. Apabila peristiwa sambungan baharu berlaku, ngx_event_accept akan dipanggil. Proses umum adalah seperti berikut:

Proses pekerja secara berterusan memanggil fungsi ngx_process_events_and_timers dalam kaedah ngx_worker_process_cycle untuk memproses peristiwa.

ngx_process_events_and_timers akan memanggil ngx_process_events, iaitu makro, bersamaan dengan ngx_event_actions.process_events ngx_event_actions ialah struktur global yang menyimpan 10 antara muka fungsi yang sepadan dengan modul epoll (di sini adalah modul dipacu acara). Jadi fungsi ngx_epoll_module_ctx.actions.process_events dipanggil di sini, iaitu fungsi ngx_epoll_process_events untuk memproses acara.

ngx_epoll_process_events memanggil antara muka fungsi linux epoll_wait untuk mendapatkan acara "sambungan baharu", dan kemudian memanggil fungsi pemprosesan pengendali acara ini untuk memproses acara ini.

Seperti yang dinyatakan di atas, pengendali telah ditetapkan kepada fungsi ngx_event_accept, jadi ngx_event_accept dipanggil untuk pemprosesan sebenar.

Kaedah ngx_event_accept dianalisis di bawah carta alirannya adalah seperti berikut:

Apakah aliran pemprosesan rangka kerja dipacu peristiwa Nginx?

Kod yang dipermudahkan adalah seperti berikut nombor siri dalam rajah di atas:

void
ngx_event_accept(ngx_event_t *ev)
{
 socklen_t  socklen;
 ngx_err_t  err;
 ngx_log_t  *log;
 ngx_uint_t  level;
 ngx_socket_t  s;
 ngx_event_t  *rev, *wev;
 ngx_listening_t  *ls;
 ngx_connection_t *c, *lc;
 ngx_event_conf_t *ecf;
 u_char  sa[ngx_sockaddrlen];
 
 if (ev->timedout) {
  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) {
   return;
  }
 
  ev->timedout = 0;
 }
 
 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
 
 if (ngx_event_flags & ngx_use_rtsig_event) {
  ev->available = 1;
 
 } else if (!(ngx_event_flags & ngx_use_kqueue_event)) {
  ev->available = ecf->multi_accept;
 }
 
 lc = ev->data;
 ls = lc->listening;
 ev->ready = 0;
 
 do {
  socklen = ngx_sockaddrlen;
 
  /* 1、accept方法试图建立连接,非阻塞调用 */
  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
 
  if (s == (ngx_socket_t) -1)
  {
   err = ngx_socket_errno;
 
   if (err == ngx_eagain)
   {
    /* 没有连接,直接返回 */
    return;
   }
 
   level = ngx_log_alert;
 
   if (err == ngx_econnaborted) {
    level = ngx_log_err;
 
   } else if (err == ngx_emfile || err == ngx_enfile) {
    level = ngx_log_crit;
   }
 
   if (err == ngx_econnaborted) {
    if (ngx_event_flags & ngx_use_kqueue_event) {
     ev->available--;
    }
 
    if (ev->available) {
     continue;
    }
   }
 
   if (err == ngx_emfile || err == ngx_enfile) {
    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
     != ngx_ok)
    {
     return;
    }
 
    if (ngx_use_accept_mutex) {
     if (ngx_accept_mutex_held) {
      ngx_shmtx_unlock(&ngx_accept_mutex);
      ngx_accept_mutex_held = 0;
     }
 
     ngx_accept_disabled = 1;
 
    } else {
     ngx_add_timer(ev, ecf->accept_mutex_delay);
    }
   }
 
   return;
  }
 
  /* 2、设置负载均衡阈值 */
  ngx_accept_disabled = ngx_cycle->connection_n / 8
        - ngx_cycle->free_connection_n;
 
  /* 3、从连接池获得一个连接对象 */
  c = ngx_get_connection(s, ev->log);
 
  /* 4、为连接创建内存池 */
  c->pool = ngx_create_pool(ls->pool_size, ev->log);
 
  c->sockaddr = ngx_palloc(c->pool, socklen);
 
  ngx_memcpy(c->sockaddr, sa, socklen);
 
  log = ngx_palloc(c->pool, sizeof(ngx_log_t));
 
  /* set a blocking mode for aio and non-blocking mode for others */
  /* 5、设置套接字属性为阻塞或非阻塞 */
  if (ngx_inherited_nonblocking) {
   if (ngx_event_flags & ngx_use_aio_event) {
    if (ngx_blocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_blocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
 
  } else {
   if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) {
    if (ngx_nonblocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_nonblocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
  }
 
  *log = ls->log;
 
  c->recv = ngx_recv;
  c->send = ngx_send;
  c->recv_chain = ngx_recv_chain;
  c->send_chain = ngx_send_chain;
 
  c->log = log;
  c->pool->log = log;
 
  c->socklen = socklen;
  c->listening = ls;
  c->local_sockaddr = ls->sockaddr;
  c->local_socklen = ls->socklen;
 
  c->unexpected_eof = 1;
 
  rev = c->read;
  wev = c->write;
 
  wev->ready = 1;
 
  if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) {
   /* rtsig, aio, iocp */
   rev->ready = 1;
  }
 
  if (ev->deferred_accept) {
   rev->ready = 1;
 
  }
 
  rev->log = log;
  wev->log = log;
 
  /*
   * todo: mt: - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   *
   * todo: mp: - allocated in a shared memory
   *   - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   */
 
  c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
  if (ls->addr_ntop) {
   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
   if (c->addr_text.data == null) {
    ngx_close_accepted_connection(c);
    return;
   }
 
   c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
            c->addr_text.data,
            ls->addr_text_max_len, 0);
   if (c->addr_text.len == 0) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  /* 6、将新连接对应的读写事件添加到epoll对象中 */
  if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) {
   if (ngx_add_conn(c) == ngx_error) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  log->data = null;
  log->handler = null;
 
  /* 7、tcp建立成功调用的方法,这个方法在ngx_listening_t结构体中 */
  ls->handler(c);
 
 } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */
}

Masalah "kumpulan guruh" dalam nginx

nginx secara amnya menjalankan berbilang proses pekerja dan proses ini mendengar port yang sama pada masa yang sama. Apabila sambungan baharu tiba, kernel membangunkan semua proses ini, tetapi hanya satu proses yang boleh berjaya menyambung kepada pelanggan, menyebabkan proses lain membazirkan banyak overhed apabila bangun. Ini dipanggil fenomena "gerombolan gemuruh". Cara nginx menyelesaikan masalah "kejutan" adalah dengan membiarkan proses mendapatkan kunci mutex ngx_accept_mutex dan biarkan proses memasuki bahagian kritikal tertentu secara bersama. Dalam bahagian kritikal ini, proses menambah acara baca sepadan dengan sambungan yang ingin dipantau ke modul epoll, supaya apabila peristiwa "sambungan baharu" berlaku, proses pekerja akan bertindak balas. Proses mengunci dan menambah acara ini selesai dalam fungsi ngx_trylock_accept_mutex. Apabila proses lain turut memasuki fungsi ini dan ingin menambah acara baca, mereka mendapati bahawa mutex dipegang oleh proses lain, jadi ia hanya boleh kembali Peristiwa yang didengarinya tidak boleh ditambahkan pada modul epoll, jadi ia tidak boleh bertindak balas kepada "sambungan baharu" "acara. Tetapi ini akan menimbulkan persoalan: bilakah proses yang memegang kunci mutex melepaskan kunci mutex? Jika anda perlu menunggu untuk memproses semua acara sebelum melepaskan kunci, ia akan mengambil masa yang lama. Dalam tempoh ini, proses pekerja lain tidak dapat mewujudkan sambungan baharu, yang jelas tidak diingini. Penyelesaian untuk nginx ialah: proses yang telah memperoleh kunci mutex melalui ngx_trylock_accept_mutex, selepas mendapat acara baca/tulis sedia dan kembali dari epoll_wait, masukkan acara ini ke dalam baris gilir:

Letakkan acara sambungan baharu dalam ngx_posted_accept_events baris gilir
Acara sambungan sedia ada dimasukkan ke dalam baris gilir ngx_posted_events

Kodnya adalah seperti berikut:

if (flags & ngx_post_events)
{
 /* 延后处理这批事件 */
 queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);
 
 /* 将事件添加到延后执行队列中 */
 ngx_locked_post_event(rev, queue);
}
else
{
 rev->handler(rev); /* 不需要延后,则立即处理事件 */
}

Tulis acara untuk pemprosesan yang serupa. Proses seterusnya memproses peristiwa dalam baris gilir ngx_posted_accept_events, dan segera melepaskan kunci mutex selepas diproses, meminimumkan masa proses mengambil kunci.

Isu pengimbangan beban dalam nginx

Setiap proses dalam nginx menggunakan ambang ngx_accept_disabled untuk mengendalikan pengimbangan beban, yang dimulakan dalam langkah 2 rajah di atas :

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

Nilai awalnya ialah nombor negatif, dan nilai mutlak nombor negatif adalah sama dengan 7/8 daripada jumlah bilangan sambungan. Apabila ambang kurang daripada 0, ia bertindak balas kepada peristiwa sambungan baharu secara normal Apabila ambang lebih daripada 0, ia tidak lagi bertindak balas kepada peristiwa sambungan baharu dan mengecilkan ngx_accept_disabled dengan 1. Kodnya adalah seperti berikut:

if (ngx_accept_disabled > 0)
{
  ngx_accept_disabled--;
}
else
{
 if (ngx_trylock_accept_mutex(cycle) == ngx_error)
 {
  return;
 }
 ....
}

Ini menunjukkan bahawa apabila sambungan semasa proses Apabila nombor mencapai 7/8 daripada jumlah bilangan sambungan yang boleh diproses, mekanisme pengimbangan beban dicetuskan dan proses berhenti bertindak balas kepada sambungan baharu.

Atas ialah kandungan terperinci Apakah aliran pemprosesan rangka kerja dipacu peristiwa Nginx?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam