Home  >  Article  >  Backend Development  >  Implementation of upstream mechanism in Nginx

Implementation of upstream mechanism in Nginx

WBOY
WBOYOriginal
2016-08-08 09:30:042063browse

Overview

The upstream mechanism makes Nginx become a reverse proxy server. Nginx receives http requests from downstream clients, processes the requests, and sends TCP request messages to the upstream server based on the requests. The upstream server will respond based on the request. The request returns a corresponding response message, and Nginx decides whether to forward the response message to the downstream client based on the response message from the upstream server. In addition, the upstream mechanism provides a load balancing function, which can load balance requests to a server in the cluster server.

Start upstream

Call the ngx_http_upstream_init method in Nginx to start the upstream mechanism, but you must call the ngx_http_upstream_create method to create the ngx_http_upstream_t structure before using the upstream mechanism, because by default the up in the ngx_http_request_t structure The stream member points to NULL, The specific initialization work of this structure still needs to be performed by HTTP Module completed. For relevant descriptions of the ngx_http_upstream_t structure and ngx_http_upstream_conf_t structure, please refer to the article "Upstream Mechanism in Nginx".

The following is the implementation of the function ngx_http_upstream_create:

/* 创建 ngx_http_upstream_t 结构体 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
    ngx_http_upstream_t  *u;

    u = r->upstream;

    /*
     * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
     * 则调用cleanup清理方法将原始结构体清除;
     */
    if (u && u->cleanup) {
        r->main->count++;
        ngx_http_upstream_cleanup(r);
    }

    /* 从内存池分配ngx_http_upstream_t 结构体空间 */
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_ERROR;
    }

    /* 给ngx_http_request_t 结构体成员upstream赋值 */
    r->upstream = u;

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

#if (NGX_HTTP_CACHE)
    r->cache = NULL;
#endif

    u->headers_in.content_length_n = -1;

    return NGX_OK;
}

The execution process of the upstream mechanism startup method ngx_http_upstream_init is as follows:

  • Check the read on the connection between Nginx and the downstream server Check whether the event is in the timer, that is, check whether the timer_set flag is 1. If the flag is 1, remove the read event from the timer;
  • Call the ngx_http_upstream_init_request method to start the upstream mechanism;

ngx_http_upstream_init_request method execution flow is as follows:

  • Check whether the store flag in the ngx_http_upstream_t structure is 0; check whether the post_action flag in the ngx_http_request_t structure is 0; check whether the ignore_client_abort in the ngx_http_upstream_conf_t structure Is it 0; if the above flag bits are all 0, set the callback method of the read event requested by ngx_http_request_t to ngx_http_upstream_rd_check_broken_connection; set the callback method of the write event to ngx_http_upstream_wr_check_broken_connection; Both methods will call the ngx_http_upstream_check_broken_connection method to check whether the connection between Nginx and the downstream is normal. If an error occurs, the connection will be terminated;
  • If the above flags are not met, that is, at least one is not 0, call The create_request method implemented by an HTTP module in the ngx_http_upstream_t structure in the request constructs a request to the upstream server;
  • calls the ngx_http_cleanup_add method to add a callback handler method to the end of the cleanup list of the original request, and the callback method is set to ngx_http_upstream_cleanup , if the current request ends, this method will be called to do some cleanup work;
  • Call the ngx_http_upstream_connect method to initiate a connection request to the upstream server;
/* 初始化启动upstream机制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
    ngx_connection_t     *c;

    /* 获取当前请求所对应的连接 */
    c = r->connection;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http init upstream, client timer: %d", c->read->timer_set);

#if (NGX_HTTP_SPDY)
    if (r->spdy_stream) {
        ngx_http_upstream_init_request(r);
        return;
    }
#endif

    /*
     * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
     * 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
     * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
     */
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        if (!c->write->active) {
            if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    }

    ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
    ngx_str_t                      *host;
    ngx_uint_t                      i;
    ngx_resolver_ctx_t             *ctx, temp;
    ngx_http_cleanup_t             *cln;
    ngx_http_upstream_t            *u;
    ngx_http_core_loc_conf_t       *clcf;
    ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_http_upstream_main_conf_t  *umcf;

    if (r->aio) {
        return;
    }

    u = r->upstream;

#if (NGX_HTTP_CACHE)
    ...
    ...
#endif

    /* 文件缓存标志位 */
    u->store = (u->conf->store || u->conf->store_lengths);

    /*
     * 检查ngx_http_upstream_t 结构中标志位 store;
     * 检查ngx_http_request_t 结构中标志位 post_action;
     * 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort;
     * 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开;
     */
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }

    /* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */
    if (r->request_body) {
        u->request_bufs = r->request_body->bufs;
    }

    /* 调用create_request方法构造发往上游服务器的请求 */
    if (u->create_request(r) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */
    u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);

    /* 获取ngx_http_core_module模块的loc级别的配置项结构 */
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
    u->output.alignment = clcf->directio_alignment;
    u->output.pool = r->pool;
    u->output.bufs.num = 1;
    u->output.bufs.size = clcf->client_body_buffer_size;
    u->output.output_filter = ngx_chain_writer;
    u->output.filter_ctx = &u->writer;

    u->writer.pool = r->pool;

    /* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
    if (r->upstream_states == NULL) {

        r->upstream_states = ngx_array_create(r->pool, 1,
                                            sizeof(ngx_http_upstream_state_t));
        if (r->upstream_states == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

    } else {

        u->state = ngx_array_push(r->upstream_states);
        if (u->state == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
    }

    /*
     * 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法,
     * 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
     */
    cln = ngx_http_cleanup_add(r, 0);
    if (cln == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    cln->handler = ngx_http_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;

    if (u->resolved == NULL) {

        /* 若没有实现u->resolved标志位,则定义上游服务器的配置 */
        uscf = u->conf->upstream;

    } else {

        /*
         * 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址;
         */


        /*
         * 若已经指定了上游服务器地址,则不需要解析,
         * 直接调用ngx_http_upstream_connection方法向上游服务器发起连接;
         * 并return从当前函数返回;
         */
        if (u->resolved->sockaddr) {

            if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
                != NGX_OK)
            {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            ngx_http_upstream_connect(r, u);

            return;
        }

        /*
         * 若还没指定上游服务器的地址,则需解析主机域名;
         * 若成功解析出上游服务器的地址和端口号,
         * 则调用ngx_http_upstream_connection方法向上游服务器发起连接;
         */
        host = &u->resolved->host;

        umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

        uscfp = umcf->upstreams.elts;

        for (i = 0; i < umcf->upstreams.nelts; i++) {

            uscf = uscfp[i];

            if (uscf->host.len == host->len
                && ((uscf->port == 0 && u->resolved->no_port)
                     || uscf->port == u->resolved->port)
                && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
            {
                goto found;
            }
        }

        if (u->resolved->port == 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no port in upstream \"%V\"", host);
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        temp.name = *host;

        ctx = ngx_resolve_start(clcf->resolver, &temp);
        if (ctx == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ctx == NGX_NO_RESOLVER) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no resolver defined to resolve %V", host);

            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
            return;
        }

        ctx->name = *host;
        ctx->handler = ngx_http_upstream_resolve_handler;
        ctx->data = r;
        ctx->timeout = clcf->resolver_timeout;

        u->resolved->ctx = ctx;

        if (ngx_resolve_name(ctx) != NGX_OK) {
            u->resolved->ctx = NULL;
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

found:

    if (uscf == NULL) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "no upstream configuration");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if (uscf->peer.init(r, uscf) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_http_upstream_connect(r, u);
}

static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->read);
}


static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->write);
}

Establish a connection

When the upstream mechanism establishes a TCP connection with the upstream server , uses a non-blocking mode socket, that is, it returns immediately after initiating a connection request. Regardless of whether the connection is successfully established, if it is not established successfully immediately, the socket needs to be monitored in the epoll event mechanism. Initiating a connection request to the upstream server is implemented by the function ngx_http_upstream_connect . Analyzing ngx_http_upstream_connect Before the method, first analyze the ngx_event_connect_peer method, because this method will be called by the ngx_http_upstream_connect method. The execution flow of the

ngx_event_connect_peer method is as follows:

  • Call the ngx_socket method to create a TCP socket;
  • Call the ngx_nonblocking method to set the TCP socket to non-blocking mode;
  • Set the method of receiving and sending network character streams on the socket connection;
  • Set the read and write event methods on the socket connection;
  • Add the TCP socket in a way that expects EPOLLIN | EPOLLOUT events to the epoll event mechanism;
  • Call the connect method to initiate a TCP connection request to the server;

ngx_http_upstream_connect method means to initiate a connection request to the upstream server, and its execution process is as follows:

  • 调用 ngx_event_connect_peer 方法主动向上游服务器发起连接请求,需要注意的是该方法已经将相应的套接字注册到epoll事件机制来监听读、写事件,该方法返回值为 rc;
    • 若 rc = NGX_ERROR,表示发起连接失败,则调用ngx_http_upstream_finalize_request 方法关闭连接请求,并 return 从当前函数返回;
    • 若 rc = NGX_BUSY,表示当前上游服务器处于不活跃状态,则调用 ngx_http_upstream_next 方法根据传入的参数尝试重新发起连接请求,并 return 从当前函数返回;
    • 若 rc = NGX_DECLINED,表示当前上游服务器负载过重,则调用 ngx_http_upstream_next 方法尝试与其他上游服务器建立连接,并 return 从当前函数返回;
    • 设置上游连接 ngx_connection_t 结构体的读事件、写事件的回调方法 handler 都为 ngx_http_upstream_handler,设置 ngx_http_upstream_t 结构体的写事件 write_event_handler 的回调为 ngx_http_upstream_send_request_handler,读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_header;
    • 若 rc = NGX_AGAIN,表示当前已经发起连接,但是没有收到上游服务器的确认应答报文,即上游连接的写事件不可写,则需调用 ngx_add_timer 方法将上游连接的写事件添加到定时器中,管理超时确认应答;
    • 若 rc = NGX_OK,表示成功建立连接,则调用 ngx_http_upsream_send_request 方法向上游服务器发送请求;
/* 向上游服务器建立连接 */
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_time_t        *tp;
    ngx_connection_t  *c;

    r->connection->log->action = "connecting to upstream";

    if (u->state && u->state->response_sec) {
        tp = ngx_timeofday();
        u->state->response_sec = tp->sec - u->state->response_sec;
        u->state->response_msec = tp->msec - u->state->response_msec;
    }

    u->state = ngx_array_push(r->upstream_states);
    if (u->state == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));

    tp = ngx_timeofday();
    u->state->response_sec = tp->sec;
    u->state->response_msec = tp->msec;

    /* 向上游服务器发起连接 */
    rc = ngx_event_connect_peer(&u->peer);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream connect: %i", rc);

    /* 下面根据rc不同返回值进行分析 */

    /* 若建立连接失败,则关闭当前请求,并return从当前函数返回 */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    u->state->peer = u->peer.name;

    /*
     * 若返回rc = NGX_BUSY,表示当前上游服务器不活跃,
     * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
     * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
     * 并return从当前函数返回;
     */
    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
        return;
    }

    /*
     * 若返回rc = NGX_DECLINED,表示当前上游服务器负载过重,
     * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
     * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
     * 并return从当前函数返回;
     */
    if (rc == NGX_DECLINED) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */

    c = u->peer.connection;

    c->data = r;

    /* 设置当前连接ngx_connection_t 上读、写事件的回调方法 */
    c->write->handler = ngx_http_upstream_handler;
    c->read->handler = ngx_http_upstream_handler;

    /* 设置upstream机制的读、写事件的回调方法 */
    u->write_event_handler = ngx_http_upstream_send_request_handler;
    u->read_event_handler = ngx_http_upstream_process_header;

    c->sendfile &= r->connection->sendfile;
    u->output.sendfile = c->sendfile;

    if (c->pool == NULL) {

        /* we need separate pool here to be able to cache SSL connections */

        c->pool = ngx_create_pool(128, r->connection->log);
        if (c->pool == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    c->log = r->connection->log;
    c->pool->log = c->log;
    c->read->log = c->log;
    c->write->log = c->log;

    /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */

    u->writer.out = NULL;
    u->writer.last = &u->writer.out;
    u->writer.connection = c;
    u->writer.limit = 0;

    /*
     * 检查当前ngx_http_upstream_t 结构的request_sent标志位,
     * 若该标志位为1,则表示已经向上游服务器发送请求,即本次发起连接失败;
     * 则调用ngx_http_upstream_reinit方法重新向上游服务器发起连接;
     */
    if (u->request_sent) {
        if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    if (r->request_body
        && r->request_body->buf
        && r->request_body->temp_file
        && r == r->main)
    {
        /*
         * the r->request_body->buf can be reused for one request only,
         * the subrequests should allocate their own temporary bufs
         */

        u->output.free = ngx_alloc_chain_link(r->pool);
        if (u->output.free == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        u->output.free->buf = r->request_body->buf;
        u->output.free->next = NULL;
        u->output.allocated = 1;

        r->request_body->buf->pos = r->request_body->buf->start;
        r->request_body->buf->last = r->request_body->buf->start;
        r->request_body->buf->tag = u->output.tag;
    }

    u->request_sent = 0;

    /*
     * 若返回rc = NGX_AGAIN,表示没有收到上游服务器允许建立连接的应答;
     * 由于写事件已经添加到epoll事件机制中等待可写事件发生,
     * 所有在这里只需将当前连接的写事件添加到定时器机制中进行超时管理;
     * 并return从当前函数返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /*
     * 若返回值rc = NGX_OK,表示连接成功建立,
     * 调用此方法向上游服务器发送请求 */
    ngx_http_upstream_send_request(r, u);
}

发送请求

        当 Nginx 与上游服务器成功建立连接之后,会调用 ngx_http_upstream_send_request 方法发送请求,若是该方法不能一次性把请求内容发送完成时,则需等待 epoll 事件机制的写事件发生,若写事件发生,则会调用写事件 write_event_handler 的回调方法 ngx_http_upstream_send_request_handler 继续发送请求,并且有可能会多次调用该写事件的回调方法, 直到把请求发送完成。

下面是 ngx_http_upstream_send_request 方法的执行流程:

  • 检查 ngx_http_upstream_t 结构体中的标志位 request_sent 是否为 0,若为 0 表示未向上游发送请求。 且此时调用 ngx_http_upstream_test_connect 方法测试是否与上游建立连接,若返回非 NGX_OK, 则需调用 ngx_http_upstream_next 方法试图与上游建立连接,并return 从当前函数返回;
  • 调用 ngx_output_chain 方法向上游发送保存在 request_bufs 链表中的请求数据,该方法返回值为 rc,并设置 request_sent 标志位为 1,检查连接上写事件 timer_set 标志位是否为1,若为 1 调用ngx_del_timer 方法将写事件从定时器中移除;
  • 若 rc = NGX_ERROR,表示当前连接上出错,则调用 ngx_http_upstream_next 方法尝试再次与上游建立连接,并 return 从当前函数返回;
  • 若 rc = NGX_AGAIN,并是当前请求数据未完全发送,则需将剩余的请求数据保存在 ngx_http_upstream_t 结构体的 output 成员中,并且调用 ngx_add_timer 方法将当前连接上的写事件添加到定时器中,调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中,等待可写事件发生,并return 从当前函数返回;
  • 若 rc = NGX_OK,表示已经发送全部请求数据,则准备接收来自上游服务器的响应报文;
  • 先调用 ngx_add_timer 方法将当前连接的读事件添加到定时器机制中,检测接收响应是否超时,检查当前连接上的读事件是否准备就绪,即标志位 ready 是否为1,若该标志位为 1,则调用 ngx_http_upstream_process_header 方法开始处理响应头部,并 return 从当前函数返回;
  • 若当前连接上读事件的标志位 ready 为0,表示暂时无可读数据,则需等待读事件再次被触发,由于原始读事件的回调方法为 ngx_http_upstream_process_header,所有无需重新设置。由于请求已经全部发送,防止写事件的回调方法 ngx_http_upstream_send_request_handler 再次被触发,因此需要重新设置写事件的回调方法为 ngx_http_upstream_dummy_handler,该方法实际上不执行任何操作,同时调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中;
/* 向上游服务器发送请求 */
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    /* 获取当前连接 */
    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream send request");

    /*
     * 若标志位request_sent为0,表示还未发送请求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
     * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
     * 并return从当前函数返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    c->log->action = "sending request to upstream";

    /*
     * 调用ngx_output_chain方法向上游发送保存在request_bufs链表中的请求数据;
     * 值得注意的是该方法的第二个参数可以是NULL也可以是request_bufs,那怎么来区分呢?
     * 若是第一次调用该方法发送request_bufs链表中的请求数据时,request_sent标志位为0,
     * 此时,第二个参数自然就是request_bufs了,那么为什么会有NULL作为参数的情况呢?
     * 当在第一次调用该方法时,并不能一次性把所有request_bufs中的数据发送完毕时,
     * 此时,会把剩余的数据保存在output结构里面,并把标志位request_sent设置为1,
     * 因此,再次发送请求数据时,不用指定request_bufs参数,因为此时剩余数据已经保存在output中;
     */
    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);

    /* 向上游服务器发送请求之后,把request_sent标志位设置为1 */
    u->request_sent = 1;

    /* 下面根据不同rc的返回值进行判断 */

    /*
     * 若返回值rc=NGX_ERROR,表示当前连接上出错,
     * 将错误信息传递给ngx_http_upstream_next方法,
     * 该方法根据错误信息决定是否重新向上游服务器发起连接;
     * 并return从当前函数返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 检查当前连接上写事件的标志位timer_set是否为1,
     * 若该标志位为1,则需把写事件从定时器机制中移除;
     */
    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }

    /*
     * 若返回值rc = NGX_AGAIN,表示请求数据并未完全发送,
     * 即有剩余的请求数据保存在output中,但此时,写事件已经不可写,
     * 则调用ngx_add_timer方法把当前连接上的写事件添加到定时器机制,
     * 并调用ngx_handle_write_event方法将写事件注册到epoll事件机制中;
     * 并return从当前函数返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);

        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示已经发送完全部请求数据,
     * 准备接收来自上游服务器的响应报文,则执行以下程序;
     */
    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }

    /* 将当前连接上读事件添加到定时器机制中 */
    ngx_add_timer(c->read, u->conf->read_timeout);

    /*
     * 若此时,读事件已经准备就绪,
     * 则调用ngx_http_upstream_process_header方法开始接收并处理响应头部;
     * 并return从当前函数返回;
     */
    if (c->read->ready) {
        ngx_http_upstream_process_header(r, u);
        return;
    }

    /*
     * 若当前读事件未准备就绪;
     * 则把写事件的回调方法设置为ngx_http_upstream_dumy_handler方法(不进行任何实际操作);
     * 并把写事件注册到epoll事件机制中;
     */
    u->write_event_handler = ngx_http_upstream_dummy_handler;

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
}

当无法一次性将请求内容全部发送完毕,则需等待 epoll 事件机制的写事件发生,一旦发生就会调用回调方法 ngx_http_upstream_send_request_handler。

ngx_http_upstream_send_request_handler 方法的执行流程如下所示:

  • 检查连接上写事件是否超时,即timedout 标志位是否为 1,若为 1 表示已经超时,则调用 ngx_http_upstream_next 方法重新向上游发起连接请求,并 return 从当前函数返回;
  • 若标志位 timedout 为0,即不超时,检查 header_sent 标志位是否为 1,表示已经接收到来自上游服务器的响应头部,则不需要再向上游发送请求,将写事件的回调方法设置为 ngx_http_upstream_dummy_handler,同时将写事件注册到 epoll 事件机制中,并return 从当前函数返回;
  • 若标志位 header_sent 为 0,则调用 ngx_http_upstream_send_request 方法向上游发送请求数据;
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream send request handler");

    /* 检查当前连接上写事件的超时标志位 */
    if (c->write->timedout) {
        /* 执行超时重连机制 */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /* 已经接收到上游服务器的响应头部,则不需要再向上游服务器发送请求数据 */
    if (u->header_sent) {
        /* 将写事件的回调方法设置为不进行任何实际操作的方法ngx_http_upstream_dumy_handler */
        u->write_event_handler = ngx_http_upstream_dummy_handler;

        /* 将写事件注册到epoll事件机制中,并return从当前函数返回 */
        (void) ngx_handle_write_event(c->write, 0);

        return;
    }

    /* 若没有接收来自上游服务器的响应头部,则需向上游服务器发送请求数据 */
    ngx_http_upstream_send_request(r, u);
}

接收响应

接收响应头部

当 Nginx 已经向上游发送请求,准备开始接收来自上游的响应头部,由方法 ngx_http_upstream_process_header 实现,该方法接收并解析响应头部。

ngx_http_upstream_process_header 方法的执行流程如下:

  • 检查上游连接上的读事件是否超时,若标志位 timedout 为 1,则表示超时,此时调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
  • 若标志位 timedout 为 0,接着检查 ngx_http_upstream_t 结构体中的标志位 request_sent,若该标志位为 0,表示未向上游发送请求,同时调用 ngx_http_upstream_test_connect 方法测试连接状态,若该方法返回值为非 NGX_OK,表示与上游已经断开连接,则调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
  • 检查 ngx_http_upstream_t 结构体中接收响应头部的 buffer 缓冲区是否有内存空间以便接收响应头部,若 buffer.start 为 NULL,表示该缓冲区为空,则需调用 ngx_palloc 方法分配内存,该内存大小 buffer_size 由 ngx_http_upstream_conf_t 配置结构体的 buffer_size 成员指定;
  • 调用 recv 方法开始接收来自上游服务器的响应头部,并根据该方法的返回值 n 进行判断:
    • 若 n = NGX_AGAIN,表示读事件未准备就绪,需要等待下次读事件被触发时继续接收响应头部,此时,调用 ngx_add_timer 方法将读事件添加到定时器中,同时调用 ngx_handle_read_event 方法将读事件注册到epoll 事件机制中,并 return 从当前函数返回;
    • 若 n = NGX_ERROR 或 n = 0,表示上游连接发生错误 或 上游服务器主动关闭连接,则调用 ngx_http_upstream_next 方法重新发起连接请求,并 return 从当前函数返回;
    • 若 n 大于 0,表示已经接收到响应头部,此时,调用 ngx_http_upstream_t 结构体中由 HTTP 模块实现的 process_header 方法解析响应头部,且返回 rc 值;
  • 若 rc = NGX_AGAIN,表示接收到的响应头部不完整,检查接收缓冲区 buffer 是否还有剩余的内存空间,若缓冲区没有剩余的内存空间,表示接收到的响应头部过大,此时调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;若缓冲区还有剩余的内存空间,则continue 继续接收响应头部;
  • 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的响应头部是非法的,则调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;
  • 若 rc = NGX_ERROR,表示连接出错,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
  • 若 rc = NGX_OK,表示已接收到完整的响应头部,则调用 ngx_http_upstream_process_headers 方法处理已解析的响应头部,该方法会将已解析出来的响应头部保存在 ngx_http_request_t 结构体中的 headers_out 成员;
  • 检查 ngx_http_request_t 结构体的 subrequest_in_memory 成员决定是否需要转发响应给下游服务器;
    • 若 subrequest_in_memory 为 0,表示需要转发响应给下游服务器,则调用 ngx_http_upstream_send_response 方法开始转发响应给下游服务器,并 return 从当前函数返回;
    • 若 subrequest_in_memory 为 1,表示不需要将响应转发给下游,此时检查 HTTP 模块是否定义了 ngx_http_upstream_t 结构体中的 input_filter 方法处理响应包体;
  • 若没有定义 input_filter 方法,则使用 upstream 机制默认方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
  • 若定义了自己的 input_filter 方法,则首先调用 input_filter_init 方法为处理响应包体做初始化工作;
  • 检查接收缓冲区 buffer 在解析完响应头部之后剩余的字符流,若有剩余的字符流,则表示已经预接收了响应包体,此时调用 input_filter 方法处理响应包体;
  • 设置 upstream 机制读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_body_in_memory,并调用该方法开始接收并解析响应包体;
/* 接收并解析响应头部 */
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process header");

    c->log->action = "reading response header from upstream";

    /* 检查当前连接上的读事件是否超时 */
    if (c->read->timedout) {
        /*
         * 若标志位timedout为1,表示读事件超时;
         * 则把超时错误传递给ngx_http_upstream_next方法,
         * 该方法根据允许的错误进行重连接策略;
         * 并return从当前函数返回;
         */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    /*
     * 若标志位request_sent为0,表示还未发送请求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
     * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
     * 并return从当前函数返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 检查ngx_http_upstream_t结构体中接收响应头部的buffer缓冲区;
     * 若接收缓冲区buffer未分配内存,则调用ngx_palloce方法分配内存,
     * 该内存的大小buffer_size由ngx_http_upstream_conf_t配置结构的buffer_size指定;
     */
    if (u->buffer.start == NULL) {
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
        if (u->buffer.start == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        /* 调整接收缓冲区buffer,准备接收响应头部 */
        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        /* 表示该缓冲区内存可被复用、数据可被改变 */
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;

        /* 初始化headers_in的成员headers链表 */
        if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

#if (NGX_HTTP_CACHE)

        if (r->cache) {
            u->buffer.pos += r->cache->header_start;
            u->buffer.last = u->buffer.pos;
        }
#endif
    }

    for ( ;; ) {

        /* 调用recv方法从当前连接上读取响应头部数据 */
        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        /* 下面根据 recv 方法不同返回值 n 进行判断 */

        /*
         * 若返回值 n = NGX_AGAIN,表示读事件未准备就绪,
         * 需等待下次读事件被触发时继续接收响应头部,
         * 即将读事件注册到epoll事件机制中,等待可读事件发生;
         * 并return从当前函数返回;
         */
        if (n == NGX_AGAIN) {
#if 0
            ngx_add_timer(rev, u->read_timeout);
#endif

            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }

        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }

        /*
         * 若返回值 n = NGX_ERROR 或 n = 0,则表示上游服务器已经主动关闭连接;
         * 此时,调用ngx_http_upstream_next方法决定是否重新发起连接;
         * 并return从当前函数返回;
         */
        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        /* 若返回值 n 大于 0,表示已经接收到响应头部 */
        u->buffer.last += n;

#if 0
        u->valid_header_in = 0;

        u->peer.cached = 0;
#endif

        /*
         * 调用ngx_http_upstream_t结构体中process_header方法开始解析响应头部;
         * 并根据该方法返回值进行不同的判断;
         */
        rc = u->process_header(r);

        /*
         * 若返回值 rc = NGX_AGAIN,表示接收到的响应头部不完整,
         * 需等待下次读事件被触发时继续接收响应头部;
         * continue继续接收响应;
         */
        if (rc == NGX_AGAIN) {

            if (u->buffer.last == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            continue;
        }

        break;
    }

    /*
     * 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
     * 则表示接收到的响应头部是非法的,
     * 调用ngx_http_upstream_next方法决定是否重新发起连接;
     * 并return从当前函数返回;
     */
    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    /*
     * 若返回值 rc = NGX_ERROR,表示出错,
     * 则调用ngx_http_upstream_finalize_request方法结束该请求;
     * 并return从当前函数返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示成功解析到完整的响应头部;*/
    if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {

        if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
            return;
        }

        if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
            return;
        }
    }

    /* 调用ngx_http_upstream_process_headers方法处理已解析处理的响应头部 */
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }

    /*
     * 检查ngx_http_request_t 结构体的subrequest_in_memory成员决定是否转发响应给下游服务器;
     * 若该标志位为0,则需调用ngx_http_upstream_send_response方法转发响应给下游服务器;
     * 并return从当前函数返回;
     */
    if (!r->subrequest_in_memory) {
        ngx_http_upstream_send_response(r, u);
        return;
    }

    /* 若不需要转发响应,则调用ngx_http_upstream_t中的input_filter方法处理响应包体 */
    /* subrequest content in memory */

    /*
     * 若HTTP模块没有定义ngx_http_upstream_t中的input_filter处理方法;
     * 则使用upstream机制默认方法ngx_http_upstream_non_buffered_filter;
     *
     * 若HTTP模块实现了input_filter方法,则不使用upstream默认的方法;
     */
    if (u->input_filter == NULL) {
        u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
        u->input_filter = ngx_http_upstream_non_buffered_filter;
        u->input_filter_ctx = r;
    }

    /*
     * 调用input_filter_init方法为处理包体做初始化工作;
     */
    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    /*
     * 检查接收缓冲区是否有剩余的响应数据;
     * 因为响应头部已经解析完毕,若接收缓冲区还有未被解析的剩余数据,
     * 则该数据就是响应包体;
     */
    n = u->buffer.last - u->buffer.pos;

    /*
     * 若接收缓冲区有剩余的响应包体,调用input_filter方法开始处理已接收到响应包体;
     */
    if (n) {
        u->buffer.last = u->buffer.pos;

        u->state->response_length += n;

        /* 调用input_filter方法处理响应包体 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /* 设置upstream机制的读事件回调方法read_event_handler为ngx_http_upstream_process_body_in_memory */
    u->read_event_handler = ngx_http_upstream_process_body_in_memory;

    /* 调用ngx_http_upstream_process_body_in_memory方法开始处理响应包体 */
    ngx_http_upstream_process_body_in_memory(r, u);
}

接收响应包体

接收并解析响应包体由 ngx_http_upstream_process_body_in_memory 方法实现;

ngx_http_upstream_process_body_in_memory 方法的执行流程如下所示:

  • 检查上游连接上读事件是否超时,若标志位 timedout 为 1,则表示已经超时,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
  • 检查接收缓冲区 buffer 是否还有剩余的内存空间,若没有剩余的内存空间,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;若有剩余的内存空间则调用 recv 方法开始接收响应包体;
    • 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体,调用 ngx_handle_read_event 方法将读事件注册到 epoll 事件机制中,同时将读事件添加到定时器机制中;
    • 若返回值 n = 0 或 n = NGX_ERROR,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
    • 若返回值 n 大于 0,则表示成功接收到响应包体,调用 input_filter 方法开始处理响应包体,检查读事件的 ready 标志位;
  • 若标志位 ready 为 1,表示仍有可读的响应包体数据,因此回到步骤 2 继续调用 recv 方法读取响应包体,直到读取完毕;
  • 若标志位 ready 为 0,则调用 ngx_handle_read_event 方法将读事件注册到epoll事件机制中,同时调用 ngx_add_timer 方法将读事件添加到定时器机制中;
/* 接收并解析响应包体 */
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    size_t             size;
    ssize_t            n;
    ngx_buf_t         *b;
    ngx_event_t       *rev;
    ngx_connection_t  *c;

    c = u->peer.connection;
    rev = c->read;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process body on memory");

    /*
     * 检查读事件标志位timedout是否超时,若该标志位为1,表示响应已经超时;
     * 则调用ngx_http_upstream_finalize_request方法结束请求;
     * 并return从当前函数返回;
     */
    if (rev->timedout) {
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
        return;
    }

    b = &u->buffer;

    for ( ;; ) {

        /* 检查当前接收缓冲区是否剩余的内存空间 */
        size = b->end - b->last;

        /*
         * 若接收缓冲区不存在空闲的内存空间,
         * 则调用ngx_http_upstream_finalize_request方法结束请求;
         * 并return从当前函数返回;
         */
        if (size == 0) {
            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                          "upstream buffer is too small to read response");
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /*
         * 若接收缓冲区有可用的内存空间,
         * 则调用recv方法开始接收响应包体;
         */
        n = c->recv(c, b->last, size);

        /*
         * 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体;
         */
        if (n == NGX_AGAIN) {
            break;
        }

        /*
         * 若返回值n = 0(表示上游服务器主动关闭连接),或n = NGX_ERROR(表示出错);
         * 则调用ngx_http_upstream_finalize_request方法结束请求;
         * 并return从当前函数返回;
         */
        if (n == 0 || n == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, n);
            return;
        }

        /* 若返回值 n 大于0,表示成功读取到响应包体 */
        u->state->response_length += n;

        /* 调用input_filter方法处理本次接收到的响应包体 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /* 检查读事件的ready标志位,若为1,继续读取响应包体 */
        if (!rev->ready) {
            break;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /*
     * 若读事件的ready标志位为0,表示读事件未准备就绪,
     * 则将读事件注册到epoll事件机制中,添加到定时器机制中;
     * 读事件的回调方法不改变,即依旧为ngx_http_upstream_process_body_in_memory;
     */
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    if (rev->active) {
        ngx_add_timer(rev, u->conf->read_timeout);

    } else if (rev->timer_set) {
        ngx_del_timer(rev);
    }
}

转发响应

下面看下 upstream 处理上游响应包体的三种方式:

  1. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 1 时,upstream 不转发响应包体到下游,并由HTTP 模块实现的input_filter() 方法处理包体;
  2. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 1 时,upstream 将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
  3. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 0 时,upstream 将使用固定大小的缓冲区来转发响应包体;

转发响应由函数 ngx_http_upstream_send_response 实现,该函数的执行流程如下:

  • Call the ngx_http_send_header method to forward the response header, and set the header_sent flag in the ngx_http_upstream_t structure to 1, indicating that the response header has been forwarded;
  • If the temporary file still saves the request packet body, you need to call ngx_pool_run_cleanup_filter Method to clean up temporary files;
  • Check the buffering flag. If the flag is 1, it means that the file cache needs to be turned on. If the flag is 0, the file cache does not need to be turned on. It only needs to be forwarded with a fixed memory block size. Just respond to the packet body;
  • If the buffering flag is 0;
    • Check whether the HTTP module has implemented its own input_filter method. If not, use the upstream mechanism's default method ngx_http_upstream_non_buffered_filter;
    • Set ngx_http_upstream_t The callback method of the read event read_event_handler in the structure is ngx_http_upstream_process_non_buffered_upstream. When the upstream response is received, the ngx_http_upstream_process_non_buffered_upstream method is finally called to receive the response;
    • Set the write event write_event_ in the ngx_http_upstream_t structure The callback method of the handler is ngx_http_upstream_process_non_buffered_downstream. When going downstream When sending data, the ngx_http_upstream_process_non_buffered_downstream method is finally called through the ngx_http_handler method to send the response packet body;
    • The input_filter_init method is called to initialize the input_filter method to process the response packet body;
    • Check the receiving buffer buffer after parsing the response header After the header section, is there any remaining response data? If so, it indicates that the response packet body has been pre-received:
      • If the response packet body is pre-received during the parsing of the response header interval, the input_filter method is called to process this part of the pre-received data. Response packet body, and call the ngx_http_upstream_process_non_buffered_downstream method to forward the response packet body received this time to the downstream server;
      • If the response packet body is not received during the parsing of the response header section, the receive buffer buffer will be cleared first for reuse. Receive the response packet body and check whether the upstream connection read event is ready. If the flag ready is 1, it means it is ready. Then call the ngx_http_upstream_process_non_buffered_upstream method to receive the upstream response packet body; if the flag ready is 0, return from the current function;
  • If the buffering flag is 1;
    • Initialize the ngx_event_pipe_t pipe member in the ngx_http_upstream_t structure;
    • Call the input_filter_init method as input_filter The method handles the response package body and does initialization work;
    • Set the callback method of the read event read_event_handler on the upstream connection to ngx_http_upstream_process_upstream;
    • Set the callback method of the write event write_event_handler on the upstream connection to ngx_http_upstream_process_downstream;
    • Call the ngx_http_upstream_proess_upstream method to handle Response packet body sent by the upstream server ;
/* Forward response packet body*/
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    int tcp_nodelay;
    ssize_t n;
    ngx_int_t rc;
    ngx_event_pipe_t *p;
    ngx_connection_t *c;
    ngx_http_core_loc_conf_t *clcf;

    /* Call the ngx_http_send_hander method to send the response header to the downstream */
    rc = ngx_http_send_header(r);

    if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
        ngx_http_upstream_finalize_request(r, u, rc);
        return;
    }

    /* Set the flag header_sent to 1 */
    u->header_sent = 1;

    if (u->upgrade) {
        ngx_http_upstream_upgrade(r, u);
        return;
    }

    /* Get the TCP connection between Nginx and the downstream */
    c = r->connection;

    if (r->header_only) {

        if (u->cacheable || u->store) {

            if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
                ngx_connection_error(c, ngx_socket_errno,
                                     ngx_shutdown_socket_n "failed");
            }

            r->read_event_handler = ngx_http_request_empty_handler;
            r->write_event_handler = ngx_http_request_empty_handler;
            c->error = 1;

        } else {
            ngx_http_upstream_finalize_request(r, u, rc);
            return;
        }
    }

    /* If the temporary file saves the request package body, call the ngx_pool_run_cleanup_file method to clean the request package body of the temporary file */
    if (r->request_body && r->request_body->temp_file) {
        ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
        r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
    }clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /*
     * If the buffering flag is 0, the network speed of the downstream server will be given priority when forwarding the response;
     * That is, just allocate a fixed memory block size to receive the response from the upstream server and forward it,
     * When the memory block is full, it will suspend receiving response data from the upstream server.
     * Wait for the remaining memory space after forwarding the response data of the memory block to the downstream server before continuing to receive the response;
     */
    if (!u->buffering) {

        /*
         * If the HTTP module does not implement the input_filter method,
         * The default method of upstream mechanism ngx_http_upstream_non_buffered_filter is used;
         */
        if (u->input_filter == NULL) {
            u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
            u->input_filter = ngx_http_upstream_non_buffered_filter;
            u->input_filter_ctx = r;
        }

        /*
         * Set the callback method of the read event in the ngx_http_upstream_t structure to ngx_http_upstream_non_buffered_upstream (that is, the method of reading the upstream response);
         * Set the callback method of the write event in the current request ngx_http_request_t structure to ngx_http_upstream_process_non_buffered_downstream (that is, the method of forwarding the response to the downstream);
         */
        u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
        r->write_event_handler =
                             ngx_http_upstream_process_non_buffered_downstream;

        r->limit_rate = 0;

        /* Call input_filter_init to initialize the response package body for the input_filter method */
        if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");

            tcp_nodelay = 1;

            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                               (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(c, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }

            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }

        /* Check whether the receiving buffer buffer has received the response body after parsing the response header */
        n = u->buffer.last - u->buffer.pos;

        /* If the receiving buffer has received the response packet body */
        if (n) {
            u->buffer.last = u->buffer.pos;

            u->state->response_length += n;

            /* Call the input_filter method to start processing the response body */
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }

            /* Call this method to forward the response packet body received this time to the downstream server */
            ngx_http_upstream_process_non_buffered_downstream(r);

        } else {
            /* If there is no response packet body in the receive buffer, clear it, that is, reuse this buffer */
            u->buffer.pos = u->buffer.start;
            u->buffer.last = u->buffer.start;

            if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }

            /*
             * If the read event on the current connection is ready,
             * Then call the ngx_http_upstream_process_non_buffered_upstream method to receive the response packet body and process it;
             */
            if (u->peer.connection->read->ready || u->length == 0) {
                ngx_http_upstream_process_non_buffered_upstream(r, u);
            }
        }

        return;
    }/*
     * If the buffering flag of the ngx_http_upstream_t structure is 1, the upstream network speed will be given priority when forwarding the response packet body;
     * That is, allocate more memory and cache, that is, always receive responses from the upstream server, and save the responses from the upstream server in memory or cache;
     */
    /* TODO: preallocate event_pipe bufs, look "Content-Length" */

#if (NGX_HTTP_CACHE)
    ...
    ...
#endif

    /* Initialize ngx_event_pipe_t structure p */
    p = u->pipe;

    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
    p->output_ctx = r;
    p->tag = u->output.tag;
    p->bufs = u->conf->bufs;
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;
    p->downstream = c;
    p->pool = r->pool;
    p->log = c->log;

    p->cacheable = u->cacheable || u->store;

    p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
    if (p->temp_file == NULL) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    p->temp_file->file.fd = NGX_INVALID_FILE;
    p->temp_file->file.log = c->log;
    p->temp_file->path = u->conf->temp_path;
    p->temp_file->pool = r->pool;

    if (p->cacheable) {
        p->temp_file->persistent = 1;

    } else {
        p->temp_file->log_level = NGX_LOG_WARN;
        p->temp_file->warn = "an upstream response is buffered "
                             "to a temporary file";
    }

    p->max_temp_file_size = u->conf->max_temp_file_size;
    p->temp_file_write_size = u->conf->temp_file_write_size;

    /* Initialize preread linked list buffer preread_bufs */
    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;

    p->preread_size = u->buffer.last - u->buffer.pos;

    if (u->cacheable) {

        p->buf_to_file = ngx_calloc_buf(r->pool);
        if (p->buf_to_file == NULL) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        p->buf_to_file->start = u->buffer.start;
        p->buf_to_file->pos = u->buffer.start;
        p->buf_to_file->last = u->buffer.pos;
        p->buf_to_file->temporary = 1;
    }

    if (ngx_event_flags & NGX_USE_AIO_EVENT) {
        /* the posted aio operation may corrupt a shadow buffer */
        p->single_buf = 1;
    }

    /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
    p->free_bufs = 1;

    /*
     * event_pipe would do u->b
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn