Nginxでの上流機構の実装

WBOY
WBOYオリジナル
2016-08-08 09:30:042153ブラウズ

概要

アップストリームのメカニズムにより、Nginx はダウンストリーム クライアントから http リクエストを受信し、リクエストを処理し、リクエストに基づいて TCP リクエスト メッセージをアップストリーム サーバーに送信します。リクエストは対応する応答メッセージを返し、Nginx は上流サーバーからの応答メッセージに基づいて応答メッセージを下流クライアントに転送するかどうかを決定します。さらに、アップストリーム メカニズムは、クラスタ サーバー内のサーバーへのリクエストの負荷を分散できる負荷分散機能を提供します。

アップストリームを開始する

Nginx で ngx_http_upstream_init メソッドを呼び出してアップストリーム メカニズムを開始しますが、アップストリーム メカニズムを使用する前に ngx_http_upstream_create メソッドを呼び出して ngx_http_upstream_t 構造を作成する必要があります。これは、デフォルトでは ngx_http_request_t 構造内にあるためです。ストリーム メンバーは NULL を指しています。この構造体の特定の初期化作業は引き続き HTTP によって実行する必要があります。 モジュールが完成しました。 ngx_http_upstream_t 構造と ngx_http_upstream_conf_t 構造の関連説明については、「Nginx のアップストリーム メカニズム」の記事を参照してください。

以下は関数 ngx_http_upstream_create の実装です:

/* 创建 ngx_http_upstream_t 结构体 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
    ngx_http_upstream_t  *u;

    u = r->upstream;

    /*
     * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
     * 则调用cleanup清理方法将原始结构体清除;
     */
    if (u && u->cleanup) {
        r->main->count++;
        ngx_http_upstream_cleanup(r);
    }

    /* 从内存池分配ngx_http_upstream_t 结构体空间 */
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_ERROR;
    }

    /* 给ngx_http_request_t 结构体成员upstream赋值 */
    r->upstream = u;

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

#if (NGX_HTTP_CACHE)
    r->cache = NULL;
#endif

    u->headers_in.content_length_n = -1;

    return NGX_OK;
}

上流メカニズムの起動メソッド ngx_http_upstream_init の実行プロセスは次のとおりです:

  • 接続の読み取りを確認してくださいNginx とダウンストリームサーバーの間でイベントがタイマー内にあるかどうか、つまり timer_set フラグが 1 であるかどうかを確認します。フラグが 1 の場合は、タイマーから読み取りイベントを削除します
  • ngx_http_upstream_init_request メソッドを呼び出して開始します。アップストリームメカニズムngx_http_upstream_conf_t 構造体のignore_client_abort は 0 です。上記のフラグ ビットがすべて 0 の場合、ngx_http_request_t によって要求された読み取りイベントのコールバック メソッドを ngx_http_upstream_rd_check_broken_connection に設定します。 ngx_http_upstream_wr_check_broken_connection; どちらのメソッドも ngx_http_upstream_check_broken_connection メソッドを呼び出して、Nginx とダウンストリーム間の接続が正常であるかどうかを確認します。つまり、上記のフラグが満たされていない場合、接続は終了します。 1 つは 0 ではありません。リクエスト内の ngx_http_upstream_t 構造内の HTTP モジュールによって実装された create_request メソッドは、上流サーバーへのリクエストを構築します。

は、ngx_http_cleanup_add メソッドを呼び出して、クリーンアップ リストの最後にコールバック ハンドラー メソッドを追加します。元のリクエストのコールバック メソッドが ngx_http_upstream_cleanup に設定されている場合、現在のリクエストが終了すると、このメソッドが呼び出されてクリーンアップ作業を実行します 上流サーバーへの接続リクエストを開始するために ngx_http_upstream_connect メソッドを呼び出します。

    /* 初始化启动upstream机制 */
    void
    ngx_http_upstream_init(ngx_http_request_t *r)
    {
        ngx_connection_t     *c;
    
        /* 获取当前请求所对应的连接 */
        c = r->connection;
    
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http init upstream, client timer: %d", c->read->timer_set);
    
    #if (NGX_HTTP_SPDY)
        if (r->spdy_stream) {
            ngx_http_upstream_init_request(r);
            return;
        }
    #endif
    
        /*
         * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
         * 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
         * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
         */
        if (c->read->timer_set) {
            ngx_del_timer(c->read);
        }
    
        if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
    
            if (!c->write->active) {
                if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                    == NGX_ERROR)
                {
                    ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                    return;
                }
            }
        }
    
        ngx_http_upstream_init_request(r);
    }
    static void
    ngx_http_upstream_init_request(ngx_http_request_t *r)
    {
        ngx_str_t                      *host;
        ngx_uint_t                      i;
        ngx_resolver_ctx_t             *ctx, temp;
        ngx_http_cleanup_t             *cln;
        ngx_http_upstream_t            *u;
        ngx_http_core_loc_conf_t       *clcf;
        ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
        ngx_http_upstream_main_conf_t  *umcf;
    
        if (r->aio) {
            return;
        }
    
        u = r->upstream;
    
    #if (NGX_HTTP_CACHE)
        ...
        ...
    #endif
    
        /* 文件缓存标志位 */
        u->store = (u->conf->store || u->conf->store_lengths);
    
        /*
         * 检查ngx_http_upstream_t 结构中标志位 store;
         * 检查ngx_http_request_t 结构中标志位 post_action;
         * 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort;
         * 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开;
         */
        if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
            r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
            r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
        }
    
        /* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */
        if (r->request_body) {
            u->request_bufs = r->request_body->bufs;
        }
    
        /* 调用create_request方法构造发往上游服务器的请求 */
        if (u->create_request(r) != NGX_OK) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        /* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */
        u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);
    
        /* 获取ngx_http_core_module模块的loc级别的配置项结构 */
        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
    
        /* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
        u->output.alignment = clcf->directio_alignment;
        u->output.pool = r->pool;
        u->output.bufs.num = 1;
        u->output.bufs.size = clcf->client_body_buffer_size;
        u->output.output_filter = ngx_chain_writer;
        u->output.filter_ctx = &u->writer;
    
        u->writer.pool = r->pool;
    
        /* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
        if (r->upstream_states == NULL) {
    
            r->upstream_states = ngx_array_create(r->pool, 1,
                                                sizeof(ngx_http_upstream_state_t));
            if (r->upstream_states == NULL) {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
        } else {
    
            u->state = ngx_array_push(r->upstream_states);
            if (u->state == NULL) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
        }
    
        /*
         * 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法,
         * 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
         */
        cln = ngx_http_cleanup_add(r, 0);
        if (cln == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        cln->handler = ngx_http_upstream_cleanup;
        cln->data = r;
        u->cleanup = &cln->handler;
    
        if (u->resolved == NULL) {
    
            /* 若没有实现u->resolved标志位,则定义上游服务器的配置 */
            uscf = u->conf->upstream;
    
        } else {
    
            /*
             * 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址;
             */
    
    
            /*
             * 若已经指定了上游服务器地址,则不需要解析,
             * 直接调用ngx_http_upstream_connection方法向上游服务器发起连接;
             * 并return从当前函数返回;
             */
            if (u->resolved->sockaddr) {
    
                if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
                    != NGX_OK)
                {
                    ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                    return;
                }
    
                ngx_http_upstream_connect(r, u);
    
                return;
            }
    
            /*
             * 若还没指定上游服务器的地址,则需解析主机域名;
             * 若成功解析出上游服务器的地址和端口号,
             * 则调用ngx_http_upstream_connection方法向上游服务器发起连接;
             */
            host = &u->resolved->host;
    
            umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
    
            uscfp = umcf->upstreams.elts;
    
            for (i = 0; i < umcf->upstreams.nelts; i++) {
    
                uscf = uscfp[i];
    
                if (uscf->host.len == host->len
                    && ((uscf->port == 0 && u->resolved->no_port)
                         || uscf->port == u->resolved->port)
                    && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
                {
                    goto found;
                }
            }
    
            if (u->resolved->port == 0) {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "no port in upstream \"%V\"", host);
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            temp.name = *host;
    
            ctx = ngx_resolve_start(clcf->resolver, &temp);
            if (ctx == NULL) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            if (ctx == NGX_NO_RESOLVER) {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "no resolver defined to resolve %V", host);
    
                ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
                return;
            }
    
            ctx->name = *host;
            ctx->handler = ngx_http_upstream_resolve_handler;
            ctx->data = r;
            ctx->timeout = clcf->resolver_timeout;
    
            u->resolved->ctx = ctx;
    
            if (ngx_resolve_name(ctx) != NGX_OK) {
                u->resolved->ctx = NULL;
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            return;
        }
    
    found:
    
        if (uscf == NULL) {
            ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                          "no upstream configuration");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        if (uscf->peer.init(r, uscf) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        ngx_http_upstream_connect(r, u);
    }
    
    static void
    ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
    {
        ngx_http_upstream_check_broken_connection(r, r->connection->read);
    }
    
    
    static void
    ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
    {
        ngx_http_upstream_check_broken_connection(r, r->connection->write);
    }
  • 接続を確立します
  • 上流メカニズムが上流サーバーとの TCP 接続を確立するとき、非ブロッキング モードのソケットを使用します。つまり、接続が確立されたかどうかに関係なく、接続要求を開始するとすぐに戻ります。正常に確立されましたが、すぐに正常に確立されない場合は、ソケットを epoll イベント メカニズムで監視する必要があります。上流サーバーへの接続リクエストの開始は、関数 ngx_http_upstream_connect
  • によって実装されます。 ngx_http_upstream_connect の分析 このメソッドは
  • ngx_http_upstream_connect メソッド によって呼び出されるため、メソッドの前に、まず ngx_event_connect_peer メソッドを分析します。
  • ngx_event_connect_peer メソッドの実行フローは次のとおりです。
  • ngx_socket メソッドを呼び出して TCP ソケットを作成します。

    ngx_nonblocking メソッドを呼び出して TCP ソケットをノンブロッキング モードに設定します。 ; ソケット接続でのネットワーク文字ストリームの送受信メソッドを設定します

    EPOLLIN を想定した方法で TCP ソケットを追加します。 | EPOLLOUT イベントを epoll イベント メカニズムに送信します。

      connect メソッドを呼び出して、サーバーへの TCP 接続リクエストを開始します。は次のとおりです:
      • 调用 ngx_event_connect_peer 方法主动向上游服务器发起连接请求,需要注意的是该方法已经将相应的套接字注册到epoll事件机制来监听读、写事件,该方法返回值为 rc;
        • 若 rc = NGX_ERROR,表示发起连接失败,则调用ngx_http_upstream_finalize_request 方法关闭连接请求,并 return 从当前函数返回;
        • 若 rc = NGX_BUSY,表示当前上游服务器处于不活跃状态,则调用 ngx_http_upstream_next 方法根据传入的参数尝试重新发起连接请求,并 return 从当前函数返回;
        • 若 rc = NGX_DECLINED,表示当前上游服务器负载过重,则调用 ngx_http_upstream_next 方法尝试与其他上游服务器建立连接,并 return 从当前函数返回;
        • 设置上游连接 ngx_connection_t 结构体的读事件、写事件的回调方法 handler 都为 ngx_http_upstream_handler,设置 ngx_http_upstream_t 结构体的写事件 write_event_handler 的回调为 ngx_http_upstream_send_request_handler,读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_header;
        • 若 rc = NGX_AGAIN,表示当前已经发起连接,但是没有收到上游服务器的确认应答报文,即上游连接的写事件不可写,则需调用 ngx_add_timer 方法将上游连接的写事件添加到定时器中,管理超时确认应答;
        • 若 rc = NGX_OK,表示成功建立连接,则调用 ngx_http_upsream_send_request 方法向上游服务器发送请求;
      /* 向上游服务器建立连接 */
      static void
      ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
      {
          ngx_int_t          rc;
          ngx_time_t        *tp;
          ngx_connection_t  *c;
      
          r->connection->log->action = "connecting to upstream";
      
          if (u->state && u->state->response_sec) {
              tp = ngx_timeofday();
              u->state->response_sec = tp->sec - u->state->response_sec;
              u->state->response_msec = tp->msec - u->state->response_msec;
          }
      
          u->state = ngx_array_push(r->upstream_states);
          if (u->state == NULL) {
              ngx_http_upstream_finalize_request(r, u,
                                                 NGX_HTTP_INTERNAL_SERVER_ERROR);
              return;
          }
      
          ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
      
          tp = ngx_timeofday();
          u->state->response_sec = tp->sec;
          u->state->response_msec = tp->msec;
      
          /* 向上游服务器发起连接 */
          rc = ngx_event_connect_peer(&u->peer);
      
          ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                         "http upstream connect: %i", rc);
      
          /* 下面根据rc不同返回值进行分析 */
      
          /* 若建立连接失败,则关闭当前请求,并return从当前函数返回 */
          if (rc == NGX_ERROR) {
              ngx_http_upstream_finalize_request(r, u,
                                                 NGX_HTTP_INTERNAL_SERVER_ERROR);
              return;
          }
      
          u->state->peer = u->peer.name;
      
          /*
           * 若返回rc = NGX_BUSY,表示当前上游服务器不活跃,
           * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
           * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
           * 并return从当前函数返回;
           */
          if (rc == NGX_BUSY) {
              ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
              return;
          }
      
          /*
           * 若返回rc = NGX_DECLINED,表示当前上游服务器负载过重,
           * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
           * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
           * 并return从当前函数返回;
           */
          if (rc == NGX_DECLINED) {
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
              return;
          }
      
          /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
      
          c = u->peer.connection;
      
          c->data = r;
      
          /* 设置当前连接ngx_connection_t 上读、写事件的回调方法 */
          c->write->handler = ngx_http_upstream_handler;
          c->read->handler = ngx_http_upstream_handler;
      
          /* 设置upstream机制的读、写事件的回调方法 */
          u->write_event_handler = ngx_http_upstream_send_request_handler;
          u->read_event_handler = ngx_http_upstream_process_header;
      
          c->sendfile &= r->connection->sendfile;
          u->output.sendfile = c->sendfile;
      
          if (c->pool == NULL) {
      
              /* we need separate pool here to be able to cache SSL connections */
      
              c->pool = ngx_create_pool(128, r->connection->log);
              if (c->pool == NULL) {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
          }
      
          c->log = r->connection->log;
          c->pool->log = c->log;
          c->read->log = c->log;
          c->write->log = c->log;
      
          /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
      
          u->writer.out = NULL;
          u->writer.last = &u->writer.out;
          u->writer.connection = c;
          u->writer.limit = 0;
      
          /*
           * 检查当前ngx_http_upstream_t 结构的request_sent标志位,
           * 若该标志位为1,则表示已经向上游服务器发送请求,即本次发起连接失败;
           * 则调用ngx_http_upstream_reinit方法重新向上游服务器发起连接;
           */
          if (u->request_sent) {
              if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
          }
      
          if (r->request_body
              && r->request_body->buf
              && r->request_body->temp_file
              && r == r->main)
          {
              /*
               * the r->request_body->buf can be reused for one request only,
               * the subrequests should allocate their own temporary bufs
               */
      
              u->output.free = ngx_alloc_chain_link(r->pool);
              if (u->output.free == NULL) {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
      
              u->output.free->buf = r->request_body->buf;
              u->output.free->next = NULL;
              u->output.allocated = 1;
      
              r->request_body->buf->pos = r->request_body->buf->start;
              r->request_body->buf->last = r->request_body->buf->start;
              r->request_body->buf->tag = u->output.tag;
          }
      
          u->request_sent = 0;
      
          /*
           * 若返回rc = NGX_AGAIN,表示没有收到上游服务器允许建立连接的应答;
           * 由于写事件已经添加到epoll事件机制中等待可写事件发生,
           * 所有在这里只需将当前连接的写事件添加到定时器机制中进行超时管理;
           * 并return从当前函数返回;
           */
          if (rc == NGX_AGAIN) {
              ngx_add_timer(c->write, u->conf->connect_timeout);
              return;
          }
      
      #if (NGX_HTTP_SSL)
      
          if (u->ssl && c->ssl == NULL) {
              ngx_http_upstream_ssl_init_connection(r, u, c);
              return;
          }
      
      #endif
      
          /*
           * 若返回值rc = NGX_OK,表示连接成功建立,
           * 调用此方法向上游服务器发送请求 */
          ngx_http_upstream_send_request(r, u);
      }

      发送请求

              当 Nginx 与上游服务器成功建立连接之后,会调用 ngx_http_upstream_send_request 方法发送请求,若是该方法不能一次性把请求内容发送完成时,则需等待 epoll 事件机制的写事件发生,若写事件发生,则会调用写事件 write_event_handler 的回调方法 ngx_http_upstream_send_request_handler 继续发送请求,并且有可能会多次调用该写事件的回调方法, 直到把请求发送完成。

      下面是 ngx_http_upstream_send_request 方法的执行流程:

      • 检查 ngx_http_upstream_t 结构体中的标志位 request_sent 是否为 0,若为 0 表示未向上游发送请求。 且此时调用 ngx_http_upstream_test_connect 方法测试是否与上游建立连接,若返回非 NGX_OK, 则需调用 ngx_http_upstream_next 方法试图与上游建立连接,并return 从当前函数返回;
      • 调用 ngx_output_chain 方法向上游发送保存在 request_bufs 链表中的请求数据,该方法返回值为 rc,并设置 request_sent 标志位为 1,检查连接上写事件 timer_set 标志位是否为1,若为 1 调用ngx_del_timer 方法将写事件从定时器中移除;
      • 若 rc = NGX_ERROR,表示当前连接上出错,则调用 ngx_http_upstream_next 方法尝试再次与上游建立连接,并 return 从当前函数返回;
      • 若 rc = NGX_AGAIN,并是当前请求数据未完全发送,则需将剩余的请求数据保存在 ngx_http_upstream_t 结构体的 output 成员中,并且调用 ngx_add_timer 方法将当前连接上的写事件添加到定时器中,调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中,等待可写事件发生,并return 从当前函数返回;
      • 若 rc = NGX_OK,表示已经发送全部请求数据,则准备接收来自上游服务器的响应报文;
      • 先调用 ngx_add_timer 方法将当前连接的读事件添加到定时器机制中,检测接收响应是否超时,检查当前连接上的读事件是否准备就绪,即标志位 ready 是否为1,若该标志位为 1,则调用 ngx_http_upstream_process_header 方法开始处理响应头部,并 return 从当前函数返回;
      • 若当前连接上读事件的标志位 ready 为0,表示暂时无可读数据,则需等待读事件再次被触发,由于原始读事件的回调方法为 ngx_http_upstream_process_header,所有无需重新设置。由于请求已经全部发送,防止写事件的回调方法 ngx_http_upstream_send_request_handler 再次被触发,因此需要重新设置写事件的回调方法为 ngx_http_upstream_dummy_handler,该方法实际上不执行任何操作,同时调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中;
      /* 向上游服务器发送请求 */
      static void
      ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
      {
          ngx_int_t          rc;
          ngx_connection_t  *c;
      
          /* 获取当前连接 */
          c = u->peer.connection;
      
          ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                         "http upstream send request");
      
          /*
           * 若标志位request_sent为0,表示还未发送请求;
           * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
           * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
           * 并return从当前函数返回;
           */
          if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
              return;
          }
      
          c->log->action = "sending request to upstream";
      
          /*
           * 调用ngx_output_chain方法向上游发送保存在request_bufs链表中的请求数据;
           * 值得注意的是该方法的第二个参数可以是NULL也可以是request_bufs,那怎么来区分呢?
           * 若是第一次调用该方法发送request_bufs链表中的请求数据时,request_sent标志位为0,
           * 此时,第二个参数自然就是request_bufs了,那么为什么会有NULL作为参数的情况呢?
           * 当在第一次调用该方法时,并不能一次性把所有request_bufs中的数据发送完毕时,
           * 此时,会把剩余的数据保存在output结构里面,并把标志位request_sent设置为1,
           * 因此,再次发送请求数据时,不用指定request_bufs参数,因为此时剩余数据已经保存在output中;
           */
          rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
      
          /* 向上游服务器发送请求之后,把request_sent标志位设置为1 */
          u->request_sent = 1;
      
          /* 下面根据不同rc的返回值进行判断 */
      
          /*
           * 若返回值rc=NGX_ERROR,表示当前连接上出错,
           * 将错误信息传递给ngx_http_upstream_next方法,
           * 该方法根据错误信息决定是否重新向上游服务器发起连接;
           * 并return从当前函数返回;
           */
          if (rc == NGX_ERROR) {
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
              return;
          }
      
          /*
           * 检查当前连接上写事件的标志位timer_set是否为1,
           * 若该标志位为1,则需把写事件从定时器机制中移除;
           */
          if (c->write->timer_set) {
              ngx_del_timer(c->write);
          }
      
          /*
           * 若返回值rc = NGX_AGAIN,表示请求数据并未完全发送,
           * 即有剩余的请求数据保存在output中,但此时,写事件已经不可写,
           * 则调用ngx_add_timer方法把当前连接上的写事件添加到定时器机制,
           * 并调用ngx_handle_write_event方法将写事件注册到epoll事件机制中;
           * 并return从当前函数返回;
           */
          if (rc == NGX_AGAIN) {
              ngx_add_timer(c->write, u->conf->send_timeout);
      
              if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
      
              return;
          }
      
          /* rc == NGX_OK */
      
          /*
           * 若返回值 rc = NGX_OK,表示已经发送完全部请求数据,
           * 准备接收来自上游服务器的响应报文,则执行以下程序;
           */
          if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
              if (ngx_tcp_push(c->fd) == NGX_ERROR) {
                  ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                                ngx_tcp_push_n " failed");
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
      
              c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
          }
      
          /* 将当前连接上读事件添加到定时器机制中 */
          ngx_add_timer(c->read, u->conf->read_timeout);
      
          /*
           * 若此时,读事件已经准备就绪,
           * 则调用ngx_http_upstream_process_header方法开始接收并处理响应头部;
           * 并return从当前函数返回;
           */
          if (c->read->ready) {
              ngx_http_upstream_process_header(r, u);
              return;
          }
      
          /*
           * 若当前读事件未准备就绪;
           * 则把写事件的回调方法设置为ngx_http_upstream_dumy_handler方法(不进行任何实际操作);
           * 并把写事件注册到epoll事件机制中;
           */
          u->write_event_handler = ngx_http_upstream_dummy_handler;
      
          if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
              ngx_http_upstream_finalize_request(r, u,
                                                 NGX_HTTP_INTERNAL_SERVER_ERROR);
              return;
          }
      }

      当无法一次性将请求内容全部发送完毕,则需等待 epoll 事件机制的写事件发生,一旦发生就会调用回调方法 ngx_http_upstream_send_request_handler。

      ngx_http_upstream_send_request_handler 方法的执行流程如下所示:

      • 检查连接上写事件是否超时,即timedout 标志位是否为 1,若为 1 表示已经超时,则调用 ngx_http_upstream_next 方法重新向上游发起连接请求,并 return 从当前函数返回;
      • 若标志位 timedout 为0,即不超时,检查 header_sent 标志位是否为 1,表示已经接收到来自上游服务器的响应头部,则不需要再向上游发送请求,将写事件的回调方法设置为 ngx_http_upstream_dummy_handler,同时将写事件注册到 epoll 事件机制中,并return 从当前函数返回;
      • 若标志位 header_sent 为 0,则调用 ngx_http_upstream_send_request 方法向上游发送请求数据;
      static void
      ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
          ngx_http_upstream_t *u)
      {
          ngx_connection_t  *c;
      
          c = u->peer.connection;
      
          ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                         "http upstream send request handler");
      
          /* 检查当前连接上写事件的超时标志位 */
          if (c->write->timedout) {
              /* 执行超时重连机制 */
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
              return;
          }
      
      #if (NGX_HTTP_SSL)
      
          if (u->ssl && c->ssl == NULL) {
              ngx_http_upstream_ssl_init_connection(r, u, c);
              return;
          }
      
      #endif
      
          /* 已经接收到上游服务器的响应头部,则不需要再向上游服务器发送请求数据 */
          if (u->header_sent) {
              /* 将写事件的回调方法设置为不进行任何实际操作的方法ngx_http_upstream_dumy_handler */
              u->write_event_handler = ngx_http_upstream_dummy_handler;
      
              /* 将写事件注册到epoll事件机制中,并return从当前函数返回 */
              (void) ngx_handle_write_event(c->write, 0);
      
              return;
          }
      
          /* 若没有接收来自上游服务器的响应头部,则需向上游服务器发送请求数据 */
          ngx_http_upstream_send_request(r, u);
      }

      接收响应

      接收响应头部

      当 Nginx 已经向上游发送请求,准备开始接收来自上游的响应头部,由方法 ngx_http_upstream_process_header 实现,该方法接收并解析响应头部。

      ngx_http_upstream_process_header 方法的执行流程如下:

      • 检查上游连接上的读事件是否超时,若标志位 timedout 为 1,则表示超时,此时调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
      • 若标志位 timedout 为 0,接着检查 ngx_http_upstream_t 结构体中的标志位 request_sent,若该标志位为 0,表示未向上游发送请求,同时调用 ngx_http_upstream_test_connect 方法测试连接状态,若该方法返回值为非 NGX_OK,表示与上游已经断开连接,则调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
      • 检查 ngx_http_upstream_t 结构体中接收响应头部的 buffer 缓冲区是否有内存空间以便接收响应头部,若 buffer.start 为 NULL,表示该缓冲区为空,则需调用 ngx_palloc 方法分配内存,该内存大小 buffer_size 由 ngx_http_upstream_conf_t 配置结构体的 buffer_size 成员指定;
      • 调用 recv 方法开始接收来自上游服务器的响应头部,并根据该方法的返回值 n 进行判断:
        • 若 n = NGX_AGAIN,表示读事件未准备就绪,需要等待下次读事件被触发时继续接收响应头部,此时,调用 ngx_add_timer 方法将读事件添加到定时器中,同时调用 ngx_handle_read_event 方法将读事件注册到epoll 事件机制中,并 return 从当前函数返回;
        • 若 n = NGX_ERROR 或 n = 0,表示上游连接发生错误 或 上游服务器主动关闭连接,则调用 ngx_http_upstream_next 方法重新发起连接请求,并 return 从当前函数返回;
        • 若 n 大于 0,表示已经接收到响应头部,此时,调用 ngx_http_upstream_t 结构体中由 HTTP 模块实现的 process_header 方法解析响应头部,且返回 rc 值;
      • 若 rc = NGX_AGAIN,表示接收到的响应头部不完整,检查接收缓冲区 buffer 是否还有剩余的内存空间,若缓冲区没有剩余的内存空间,表示接收到的响应头部过大,此时调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;若缓冲区还有剩余的内存空间,则continue 继续接收响应头部;
      • 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的响应头部是非法的,则调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;
      • 若 rc = NGX_ERROR,表示连接出错,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
      • 若 rc = NGX_OK,表示已接收到完整的响应头部,则调用 ngx_http_upstream_process_headers 方法处理已解析的响应头部,该方法会将已解析出来的响应头部保存在 ngx_http_request_t 结构体中的 headers_out 成员;
      • 检查 ngx_http_request_t 结构体的 subrequest_in_memory 成员决定是否需要转发响应给下游服务器;
        • 若 subrequest_in_memory 为 0,表示需要转发响应给下游服务器,则调用 ngx_http_upstream_send_response 方法开始转发响应给下游服务器,并 return 从当前函数返回;
        • 若 subrequest_in_memory 为 1,表示不需要将响应转发给下游,此时检查 HTTP 模块是否定义了 ngx_http_upstream_t 结构体中的 input_filter 方法处理响应包体;
      • 若没有定义 input_filter 方法,则使用 upstream 机制默认方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
      • 若定义了自己的 input_filter 方法,则首先调用 input_filter_init 方法为处理响应包体做初始化工作;
      • 检查接收缓冲区 buffer 在解析完响应头部之后剩余的字符流,若有剩余的字符流,则表示已经预接收了响应包体,此时调用 input_filter 方法处理响应包体;
      • 设置 upstream 机制读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_body_in_memory,并调用该方法开始接收并解析响应包体;
      /* 接收并解析响应头部 */
      static void
      ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
      {
          ssize_t            n;
          ngx_int_t          rc;
          ngx_connection_t  *c;
      
          c = u->peer.connection;
      
          ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                         "http upstream process header");
      
          c->log->action = "reading response header from upstream";
      
          /* 检查当前连接上的读事件是否超时 */
          if (c->read->timedout) {
              /*
               * 若标志位timedout为1,表示读事件超时;
               * 则把超时错误传递给ngx_http_upstream_next方法,
               * 该方法根据允许的错误进行重连接策略;
               * 并return从当前函数返回;
               */
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
              return;
          }
      
          /*
           * 若标志位request_sent为0,表示还未发送请求;
           * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
           * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
           * 并return从当前函数返回;
           */
          if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
              return;
          }
      
          /*
           * 检查ngx_http_upstream_t结构体中接收响应头部的buffer缓冲区;
           * 若接收缓冲区buffer未分配内存,则调用ngx_palloce方法分配内存,
           * 该内存的大小buffer_size由ngx_http_upstream_conf_t配置结构的buffer_size指定;
           */
          if (u->buffer.start == NULL) {
              u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
              if (u->buffer.start == NULL) {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
      
              /* 调整接收缓冲区buffer,准备接收响应头部 */
              u->buffer.pos = u->buffer.start;
              u->buffer.last = u->buffer.start;
              u->buffer.end = u->buffer.start + u->conf->buffer_size;
              /* 表示该缓冲区内存可被复用、数据可被改变 */
              u->buffer.temporary = 1;
      
              u->buffer.tag = u->output.tag;
      
              /* 初始化headers_in的成员headers链表 */
              if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                                sizeof(ngx_table_elt_t))
                  != NGX_OK)
              {
                  ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                  return;
              }
      
      #if (NGX_HTTP_CACHE)
      
              if (r->cache) {
                  u->buffer.pos += r->cache->header_start;
                  u->buffer.last = u->buffer.pos;
              }
      #endif
          }
      
          for ( ;; ) {
      
              /* 调用recv方法从当前连接上读取响应头部数据 */
              n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
      
              /* 下面根据 recv 方法不同返回值 n 进行判断 */
      
              /*
               * 若返回值 n = NGX_AGAIN,表示读事件未准备就绪,
               * 需等待下次读事件被触发时继续接收响应头部,
               * 即将读事件注册到epoll事件机制中,等待可读事件发生;
               * 并return从当前函数返回;
               */
              if (n == NGX_AGAIN) {
      #if 0
                  ngx_add_timer(rev, u->read_timeout);
      #endif
      
                  if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                      ngx_http_upstream_finalize_request(r, u,
                                                     NGX_HTTP_INTERNAL_SERVER_ERROR);
                      return;
                  }
      
                  return;
              }
      
              if (n == 0) {
                  ngx_log_error(NGX_LOG_ERR, c->log, 0,
                                "upstream prematurely closed connection");
              }
      
              /*
               * 若返回值 n = NGX_ERROR 或 n = 0,则表示上游服务器已经主动关闭连接;
               * 此时,调用ngx_http_upstream_next方法决定是否重新发起连接;
               * 并return从当前函数返回;
               */
              if (n == NGX_ERROR || n == 0) {
                  ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
                  return;
              }
      
              /* 若返回值 n 大于 0,表示已经接收到响应头部 */
              u->buffer.last += n;
      
      #if 0
              u->valid_header_in = 0;
      
              u->peer.cached = 0;
      #endif
      
              /*
               * 调用ngx_http_upstream_t结构体中process_header方法开始解析响应头部;
               * 并根据该方法返回值进行不同的判断;
               */
              rc = u->process_header(r);
      
              /*
               * 若返回值 rc = NGX_AGAIN,表示接收到的响应头部不完整,
               * 需等待下次读事件被触发时继续接收响应头部;
               * continue继续接收响应;
               */
              if (rc == NGX_AGAIN) {
      
                  if (u->buffer.last == u->buffer.end) {
                      ngx_log_error(NGX_LOG_ERR, c->log, 0,
                                    "upstream sent too big header");
      
                      ngx_http_upstream_next(r, u,
                                             NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                      return;
                  }
      
                  continue;
              }
      
              break;
          }
      
          /*
           * 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
           * 则表示接收到的响应头部是非法的,
           * 调用ngx_http_upstream_next方法决定是否重新发起连接;
           * 并return从当前函数返回;
           */
          if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
              ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
              return;
          }
      
          /*
           * 若返回值 rc = NGX_ERROR,表示出错,
           * 则调用ngx_http_upstream_finalize_request方法结束该请求;
           * 并return从当前函数返回;
           */
          if (rc == NGX_ERROR) {
              ngx_http_upstream_finalize_request(r, u,
                                                 NGX_HTTP_INTERNAL_SERVER_ERROR);
              return;
          }
      
          /* rc == NGX_OK */
      
          /*
           * 若返回值 rc = NGX_OK,表示成功解析到完整的响应头部;*/
          if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {
      
              if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
                  return;
              }
      
              if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
                  return;
              }
          }
      
          /* 调用ngx_http_upstream_process_headers方法处理已解析处理的响应头部 */
          if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
              return;
          }
      
          /*
           * 检查ngx_http_request_t 结构体的subrequest_in_memory成员决定是否转发响应给下游服务器;
           * 若该标志位为0,则需调用ngx_http_upstream_send_response方法转发响应给下游服务器;
           * 并return从当前函数返回;
           */
          if (!r->subrequest_in_memory) {
              ngx_http_upstream_send_response(r, u);
              return;
          }
      
          /* 若不需要转发响应,则调用ngx_http_upstream_t中的input_filter方法处理响应包体 */
          /* subrequest content in memory */
      
          /*
           * 若HTTP模块没有定义ngx_http_upstream_t中的input_filter处理方法;
           * 则使用upstream机制默认方法ngx_http_upstream_non_buffered_filter;
           *
           * 若HTTP模块实现了input_filter方法,则不使用upstream默认的方法;
           */
          if (u->input_filter == NULL) {
              u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
              u->input_filter = ngx_http_upstream_non_buffered_filter;
              u->input_filter_ctx = r;
          }
      
          /*
           * 调用input_filter_init方法为处理包体做初始化工作;
           */
          if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
              ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
              return;
          }
      
          /*
           * 检查接收缓冲区是否有剩余的响应数据;
           * 因为响应头部已经解析完毕,若接收缓冲区还有未被解析的剩余数据,
           * 则该数据就是响应包体;
           */
          n = u->buffer.last - u->buffer.pos;
      
          /*
           * 若接收缓冲区有剩余的响应包体,调用input_filter方法开始处理已接收到响应包体;
           */
          if (n) {
              u->buffer.last = u->buffer.pos;
      
              u->state->response_length += n;
      
              /* 调用input_filter方法处理响应包体 */
              if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                  ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                  return;
              }
          }
      
          if (u->length == 0) {
              ngx_http_upstream_finalize_request(r, u, 0);
              return;
          }
      
          /* 设置upstream机制的读事件回调方法read_event_handler为ngx_http_upstream_process_body_in_memory */
          u->read_event_handler = ngx_http_upstream_process_body_in_memory;
      
          /* 调用ngx_http_upstream_process_body_in_memory方法开始处理响应包体 */
          ngx_http_upstream_process_body_in_memory(r, u);
      }

      接收响应包体

      接收并解析响应包体由 ngx_http_upstream_process_body_in_memory 方法实现;

      ngx_http_upstream_process_body_in_memory 方法的执行流程如下所示:

      • 检查上游连接上读事件是否超时,若标志位 timedout 为 1,则表示已经超时,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
      • 检查接收缓冲区 buffer 是否还有剩余的内存空间,若没有剩余的内存空间,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;若有剩余的内存空间则调用 recv 方法开始接收响应包体;
        • 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体,调用 ngx_handle_read_event 方法将读事件注册到 epoll 事件机制中,同时将读事件添加到定时器机制中;
        • 若返回值 n = 0 或 n = NGX_ERROR,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
        • 若返回值 n 大于 0,则表示成功接收到响应包体,调用 input_filter 方法开始处理响应包体,检查读事件的 ready 标志位;
      • 若标志位 ready 为 1,表示仍有可读的响应包体数据,因此回到步骤 2 继续调用 recv 方法读取响应包体,直到读取完毕;
      • 若标志位 ready 为 0,则调用 ngx_handle_read_event 方法将读事件注册到epoll事件机制中,同时调用 ngx_add_timer 方法将读事件添加到定时器机制中;
      /* 接收并解析响应包体 */
      static void
      ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
          ngx_http_upstream_t *u)
      {
          size_t             size;
          ssize_t            n;
          ngx_buf_t         *b;
          ngx_event_t       *rev;
          ngx_connection_t  *c;
      
          c = u->peer.connection;
          rev = c->read;
      
          ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                         "http upstream process body on memory");
      
          /*
           * 检查读事件标志位timedout是否超时,若该标志位为1,表示响应已经超时;
           * 则调用ngx_http_upstream_finalize_request方法结束请求;
           * 并return从当前函数返回;
           */
          if (rev->timedout) {
              ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
              ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
              return;
          }
      
          b = &u->buffer;
      
          for ( ;; ) {
      
              /* 检查当前接收缓冲区是否剩余的内存空间 */
              size = b->end - b->last;
      
              /*
               * 若接收缓冲区不存在空闲的内存空间,
               * 则调用ngx_http_upstream_finalize_request方法结束请求;
               * 并return从当前函数返回;
               */
              if (size == 0) {
                  ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                                "upstream buffer is too small to read response");
                  ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                  return;
              }
      
              /*
               * 若接收缓冲区有可用的内存空间,
               * 则调用recv方法开始接收响应包体;
               */
              n = c->recv(c, b->last, size);
      
              /*
               * 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体;
               */
              if (n == NGX_AGAIN) {
                  break;
              }
      
              /*
               * 若返回值n = 0(表示上游服务器主动关闭连接),或n = NGX_ERROR(表示出错);
               * 则调用ngx_http_upstream_finalize_request方法结束请求;
               * 并return从当前函数返回;
               */
              if (n == 0 || n == NGX_ERROR) {
                  ngx_http_upstream_finalize_request(r, u, n);
                  return;
              }
      
              /* 若返回值 n 大于0,表示成功读取到响应包体 */
              u->state->response_length += n;
      
              /* 调用input_filter方法处理本次接收到的响应包体 */
              if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                  ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                  return;
              }
      
              /* 检查读事件的ready标志位,若为1,继续读取响应包体 */
              if (!rev->ready) {
                  break;
              }
          }
      
          if (u->length == 0) {
              ngx_http_upstream_finalize_request(r, u, 0);
              return;
          }
      
          /*
           * 若读事件的ready标志位为0,表示读事件未准备就绪,
           * 则将读事件注册到epoll事件机制中,添加到定时器机制中;
           * 读事件的回调方法不改变,即依旧为ngx_http_upstream_process_body_in_memory;
           */
          if (ngx_handle_read_event(rev, 0) != NGX_OK) {
              ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
              return;
          }
      
          if (rev->active) {
              ngx_add_timer(rev, u->conf->read_timeout);
      
          } else if (rev->timer_set) {
              ngx_del_timer(rev);
          }
      }

      转发响应

      下面看下 upstream 处理上游响应包体的三种方式:

      1. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 1 时,upstream 不转发响应包体到下游,并由HTTP 模块实现的input_filter() 方法处理包体;
      2. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 1 时,upstream 将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
      3. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 0 时,upstream 将使用固定大小的缓冲区来转发响应包体;

      转发响应由函数 ngx_http_upstream_send_response 实现,该函数的执行流程如下:

      • ngx_http_send_header メソッドを呼び出して応答ヘッダーを転送し、ngx_http_upstream_t 構造体の header_sent フラグを 1 に設定して、応答ヘッダーが転送されたことを示します
      • 一時ファイルにリクエスト パケットの本文がまだ保存されている場合は、 ngx_pool_run_cleanup_filter メソッドを呼び出して一時ファイルをクリーンアップする必要があります。
      • フラグが 1 の場合、ファイル キャッシュをオンにする必要があることを意味します。フラグが 0 の場合、ファイル キャッシュは必要ありません。オンにする必要があるのは、パケット本体に応答するだけです。
      • HTTP モジュールが独自の input_filter を実装しているかどうかを確認します。そうでない場合は、アップストリーム メカニズムのデフォルト メソッド ngx_http_upstream_non_buffered_filter を使用します。
        • Set ngx_http_upstream_t 構造内の読み取りイベント read_event_handler のコールバック メソッドは、アップストリーム レスポンスを受信するときの ngx_http_upstream_process_non_buffered です。最後に _upstream メソッドが呼び出されて応答が受信されます。
        • 書き込みイベント write_event_ を ngx_http_upstream_t 構造体に設定します。 ハンドラーのコールバック メソッドは ngx_http_upstream_process_non_buffered_downstream です。 データを送信するとき、最後に ngx_http_upstream_process_non_buffered_downstream メソッドが呼び出され、応答パケット本体が送信されます。
        • input_filter_init メソッドは、応答パケット本体を処理するために input_filter メソッドを初期化するために呼び出されます。
        • 応答ヘッダーの解析後に受信バッファを確認します。ヘッダー セクションの後に、応答データが残っているかどうかを示します。応答パケットの本文が事前に受信されました:
        • 応答ヘッダー間隔の解析中に応答パケットの本文が事前に受信された場合、事前に受信されたデータのこの部分を処理するために input_filter メソッドが呼び出されます。 、そして ngx_http_upstream_process_non_buffered_downstream メソッドを呼び出して、今回受信した応答パケット本体をダウンストリーム サーバーに転送します
          • 応答ヘッダー セクションの解析中に応答パケット本体が受信されなかった場合、受信バッファーは最初にクリアされます。応答パケット本体を受信し、アップストリーム接続読み取りイベントの準備ができているかどうかを確認します。フラグが 1 であれば、準備ができていることを意味します。フラグが 0 であれば、ngx_http_upstream_process_non_buffered_upstream メソッドを呼び出します。 、現在の関数から戻ります。
          • バッファリングフラグが 1 の場合、
        ngx_http_upstream_t 構造体の ngx_event_pipe_t パイプメンバーを初期化します。 init メソッドを input_filter として処理します。応答パッケージの本体を作成し、初期化作業を行います。
      • アップストリーム接続の読み取りイベント read_event_handler のコールバック メソッドを ngx_http_upstream_process_upstream に設定します。
        • アップストリーム接続の書き込みイベント write_event_handler のコールバック メソッドを ngx_http_upstream_process_downstream に設定します。
        • ngx_http_upstream_proess_upstream メソッドを呼び出して、上流サーバーによって送信された応答パケット本文を処理します ;
        • /* 応答パケット本文の転送*/ 静的ボイド ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { int tcp_nolay; ssize_t n; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; /* ngx_http_send_hander メソッドを呼び出して、応答ヘッダーをダウンストリームに送信します */ rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); 戻る; } /* フラグ header_sent を 1 に設定します */ u->header_sent = 1; if (u->アップグレード) { ngx_http_upstream_upgrade(r, u); 戻る; } /* Nginx とダウンストリーム間の TCP 接続を取得します */ c = r->接続; if (r->header_only) { if (u->キャッシュ可能 || u->ストア) { if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) { ngx_connection_error(c, ngx_socket_errno, ngx_shutdown_socket_n "失敗"); } r->read_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler; c->エラー = 1; } それ以外 { ngx_http_upstream_finalize_request(r, u, rc); 戻る; } } /* 一時ファイルにリクエスト パッケージ本体が保存されている場合は、ngx_pool_run_cleanup_file メソッドを呼び出して、一時ファイルのリクエスト パッケージ本体をクリーンアップします */ if (r->request_body && r->request_body->temp_file) { ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; }clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* * バッファリング フラグが 0 の場合、応答を転送するときにダウンストリーム サーバーのネットワーク速度が優先されます。 * つまり、上流サーバーからの応答を受信して​​転送するために、固定のメモリ ブロック サイズを割り当てるだけです。 ※メモリブロックが一杯になると、上流サーバからの応答データの受信が停止されます。 * メモリ ブロックの応答データを下流サーバーに転送した後、メモリ領域が残るまで待ってから、応答の受信を続行します。 */ if (!u->バッファリング) { /* ※HTTPモジュールがinput_filterメソッドを実装していない場合、 * アップストリームメカニズムのデフォルトメソッド ngx_http_upstream_non_buffered_filter が使用されます。 */ if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } /* * ngx_http_upstream_t 構造体の読み取りイベントのコールバック メソッドを ngx_http_upstream_non_buffered_upstream (つまり、上流の応答を読み取るメソッド) に設定します。 * 現在のリクエスト ngx_http_request_t 構造にイベントを書き込むコールバック メソッドを ngx_http_upstream_process_non_buffered_downstream (つまり、応答をダウンストリームに転送するメソッド) に設定します。 */ u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; /* input_filter_init を呼び出して、input_filter メソッドの応答パッケージ本体を初期化します */ if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } if (clcf->tcp_nolay && c->tcp_nolay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nolay"); tcp_nolay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nolay, sizeof(int)) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) が失敗しました"); ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } c->tcp_nolay = NGX_TCP_NODELAY_SET; } /* 応答ヘッダーを解析した後、受信バッファが応答ボディを受信したかどうかを確認します */ n = u->buffer.last - u->buffer.pos; /* 受信バッファが応答パケット本体を受信した場合 */ if (n) { u->buffer.last = u->buffer.pos; u->state->response_length += n; /* input_filter メソッドを呼び出して応答本文の処理を開始します */ if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } /* このメソッドを呼び出して、今回受信した応答パケット本体を下流サーバーに転送します */ ngx_http_upstream_process_non_buffered_downstream(r); } それ以外 { /* 受信バッファーに応答パケット本体がない場合は、受信バッファーをクリアします。つまり、このバッファーを再利用します */ u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } /* * 現在の接続で読み取りイベントの準備ができている場合、 * 次に、ngx_http_upstream_process_non_buffered_upstream メソッドを呼び出して、応答パケット本体を受信して​​処理します。 */ if (u->peer.connection->read->ready || u->length == 0) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } 戻る; }/* * ngx_http_upstream_t 構造体のバッファリング フラグが 1 の場合、応答パケット本体の転送時に上流のネットワーク速度が優先されます。 * つまり、より多くのメモリとキャッシュを割り当てます。つまり、常に上流サーバーから応答を受信し、上流サーバーからの応答をメモリまたはキャッシュに保存します。 */ /* TODO:event_pipe bufs を事前に割り当て、「Content-Length」を確認します */ #if (NGX_HTTP_CACHE) ... ... #endif /* ngx_event_pipe_t 構造体 p を初期化します */ p = u->パイプ; p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; p->bufs = u->conf->bufs; p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->下流 = c; p->プール = r->プール; p->log = c->log; p->キャッシュ可能 = u->キャッシュ可能 || u->ストア; p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file.log = c->log; p->temp_file->path = u->conf->temp_path; p->temp_file->プール = r->プール; if (p->キャッシュ可能) { p->temp_file->persistent = 1; } それ以外 { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "アップストリームの応答はバッファリングされます" "一時ファイルに"; } p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; /* 先読みリンクリストバッファpreread_bufsを初期化します */ p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } p->preread_bufs->buf = &u->buffer; p->preread_bufs->next = NULL; u->buffer.recycled = 1; p->preread_size = u->buffer.last - u->buffer.pos; if (u->キャッシュ可能) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 戻る; } p->buf_to_file->start = u->buffer.start; p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->一時 = 1; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { /* ポストされた aio 操作によりシャドウ バッファが破損する可能性があります */ p->single_buf = 1; } /* TODO: ngx_create_chain_of_bufs() を使用する場合は p->free_bufs = 0 */ p->free_bufs = 1; /* *event_pipe は u->b を実行します
    声明:
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。