首頁 >後端開發 >php教程 >Nginx 中 upstream 機制的實現

Nginx 中 upstream 機制的實現

WBOY
WBOY原創
2016-08-08 09:30:042155瀏覽

概述

        upstream 機制使得Nginx 成為一個反向代理伺服器,Nginx 接收來自下游客戶端的http 請求,並處理該請求,同時根據該請求向伺服器上游tcp 請求該請求返回相應地回應報文,Nginx 根據上游伺服器的回應封包,決定是否向下游客戶端轉發回應封包。另外 upstream 機制提供了負載平衡的功能,可以將請求負載平衡到叢集伺服器的某個伺服器上面。

啟動upstream

        在Nginx 中呼叫ngx_http_upstream_init 方法啟動upstream 機制,但是在使用upstream 機制下必須調用該結構。體中的upstream 成員是指向NULL,該結構體的具體初始化工作還需由HTTP 模組完成。有關 ngx_http_upstream_t 結構體 和ngx_http_upstream_conf_t 結構體的相關說明可參考文章《Nginx 中 upstream 機制》。

       下面是函數ngx_http_upstream_create 的實作:

檢查Nginx 與下游伺服器之間連接上的讀事件是否在定時器中,即檢查timer_set 標誌位是否為1,若該標誌位為1,則把讀取事件從定時器中移除;

    調用ngx_http_upstream_init_request 方法啟動upstream 機制;呼叫ngx_http_upstream_init_request 方法啟動upstream 機制;呼叫ngx_http_upstream_init_request 方法啟動upstream 機制;
  • ngx_http_upstream_init_request 方法執行流程如下圖所示:
  • 檢查ngx_http_upstream_t 結構體中的store 標誌位是否為0;檢查ngx_http_request_upstream_t 結構體中的store 標誌位是否為0;檢查ngx_http_request_t 結構體中的標誌_cignmd是否為0;若上面的標誌位元都為0,則設定ngx_http_request_t 請求的讀取事件的回呼方法為ngx_http_upstream_rd_check_broken_connection;設定寫入事件的回呼方法為 ngx_http_upstream_wr_check_broken_connection;這兩個方法都會呼叫ngx_http_upstream_check_broken_connection方法檢查Nginx 與下游之間的連接是否正常,若出現錯誤,則終止連接;

不滿足上面的標誌位,即不滿足上面的標誌位,即不滿足上面的標誌請求中ngx_http_upstream_t 結構體中某個HTTP 模組實作的create_request 方法,建構發送到上游伺服器的請求;

呼叫ngx_http_cleanup_add 方法向原始請求的cleanup 鍊錶尾端新增一個回呼,若當前請求結束時會調用該方法做一些清理工作;
  • 調用ngx_http_upstream_connect 方法向上游服務器發起連接請求;
  • /* 创建 ngx_http_upstream_t 结构体 */
    ngx_int_t
    ngx_http_upstream_create(ngx_http_request_t *r)
    {
        ngx_http_upstream_t  *u;
    
        u = r->upstream;
    
        /*
         * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
         * 则调用cleanup清理方法将原始结构体清除;
         */
        if (u && u->cleanup) {
            r->main->count++;
            ngx_http_upstream_cleanup(r);
        }
    
        /* 从内存池分配ngx_http_upstream_t 结构体空间 */
        u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
        if (u == NULL) {
            return NGX_ERROR;
        }
    
        /* 给ngx_http_request_t 结构体成员upstream赋值 */
        r->upstream = u;
    
        u->peer.log = r->connection->log;
        u->peer.log_error = NGX_ERROR_ERR;
    #if (NGX_THREADS)
        u->peer.lock = &r->connection->lock;
    #endif
    
    #if (NGX_HTTP_CACHE)
        r->cache = NULL;
    #endif
    
        u->headers_in.content_length_n = -1;
    
        return NGX_OK;
    }
    /* 初始化启动upstream机制 */
    void
    ngx_http_upstream_init(ngx_http_request_t *r)
    {
        ngx_connection_t     *c;
    
        /* 获取当前请求所对应的连接 */
        c = r->connection;
    
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http init upstream, client timer: %d", c->read->timer_set);
    
    #if (NGX_HTTP_SPDY)
        if (r->spdy_stream) {
            ngx_http_upstream_init_request(r);
            return;
        }
    #endif
    
        /*
         * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
         * 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
         * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
         */
        if (c->read->timer_set) {
            ngx_del_timer(c->read);
        }
    
        if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
    
            if (!c->write->active) {
                if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                    == NGX_ERROR)
                {
                    ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                    return;
                }
            }
        }
    
        ngx_http_upstream_init_request(r);
    }
  • 建立連接
  •         upstream 機制與上游服務器建立TCP 連接時,採用的是非阻塞模式的套接字,即發起連線請求之後立即返回,不管連線是否建立成功,若沒有立即建立成功,則需在epoll 事件機制中監聽該套接字。向上游伺服器啟動連線請求由函數
  • ngx_http_upstream_connect 實作。在分析 ngx_http_upstream_connect 在方法之前,先分析下 ngx_event_connect_peer 方法,因為方法會被ngx_http_upstream_connect 方法
  • 呼叫。

ngx_event_connect_peer 方法的執行流程如下所示:

呼叫ngx_socket 方法建立一個TCP 套接字;

設定套接字連接接收和發送網路字元流的方法;

設定套接字連接上讀、寫事件方法;
  • 將TCP 套接字以期待EPOLLIN | EPOLLOUT 事件的方式添加到epoll 事件機制中;
  • 呼叫connect 方法向伺服器啟動TCP 連線請求;
  • ngx_http_upstream_connect 方法表示向上游啟動連線ngx_http_upstream_connect 方法表示向上游啟動連接要求,其執行流程所示:
    • 调用 ngx_event_connect_peer 方法主动向上游服务器发起连接请求,需要注意的是该方法已经将相应的套接字注册到epoll事件机制来监听读、写事件,该方法返回值为 rc;
      • 若 rc = NGX_ERROR,表示发起连接失败,则调用ngx_http_upstream_finalize_request 方法关闭连接请求,并 return 从当前函数返回;
      • 若 rc = NGX_BUSY,表示当前上游服务器处于不活跃状态,则调用 ngx_http_upstream_next 方法根据传入的参数尝试重新发起连接请求,并 return 从当前函数返回;
      • 若 rc = NGX_DECLINED,表示当前上游服务器负载过重,则调用 ngx_http_upstream_next 方法尝试与其他上游服务器建立连接,并 return 从当前函数返回;
      • 设置上游连接 ngx_connection_t 结构体的读事件、写事件的回调方法 handler 都为 ngx_http_upstream_handler,设置 ngx_http_upstream_t 结构体的写事件 write_event_handler 的回调为 ngx_http_upstream_send_request_handler,读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_header;
      • 若 rc = NGX_AGAIN,表示当前已经发起连接,但是没有收到上游服务器的确认应答报文,即上游连接的写事件不可写,则需调用 ngx_add_timer 方法将上游连接的写事件添加到定时器中,管理超时确认应答;
      • 若 rc = NGX_OK,表示成功建立连接,则调用 ngx_http_upsream_send_request 方法向上游服务器发送请求;
/* 向上游服务器建立连接 */
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_time_t        *tp;
    ngx_connection_t  *c;

    r->connection->log->action = "connecting to upstream";

    if (u->state && u->state->response_sec) {
        tp = ngx_timeofday();
        u->state->response_sec = tp->sec - u->state->response_sec;
        u->state->response_msec = tp->msec - u->state->response_msec;
    }

    u->state = ngx_array_push(r->upstream_states);
    if (u->state == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));

    tp = ngx_timeofday();
    u->state->response_sec = tp->sec;
    u->state->response_msec = tp->msec;

    /* 向上游服务器发起连接 */
    rc = ngx_event_connect_peer(&u->peer);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream connect: %i", rc);

    /* 下面根据rc不同返回值进行分析 */

    /* 若建立连接失败,则关闭当前请求,并return从当前函数返回 */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    u->state->peer = u->peer.name;

    /*
     * 若返回rc = NGX_BUSY,表示当前上游服务器不活跃,
     * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
     * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
     * 并return从当前函数返回;
     */
    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
        return;
    }

    /*
     * 若返回rc = NGX_DECLINED,表示当前上游服务器负载过重,
     * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
     * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
     * 并return从当前函数返回;
     */
    if (rc == NGX_DECLINED) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */

    c = u->peer.connection;

    c->data = r;

    /* 设置当前连接ngx_connection_t 上读、写事件的回调方法 */
    c->write->handler = ngx_http_upstream_handler;
    c->read->handler = ngx_http_upstream_handler;

    /* 设置upstream机制的读、写事件的回调方法 */
    u->write_event_handler = ngx_http_upstream_send_request_handler;
    u->read_event_handler = ngx_http_upstream_process_header;

    c->sendfile &= r->connection->sendfile;
    u->output.sendfile = c->sendfile;

    if (c->pool == NULL) {

        /* we need separate pool here to be able to cache SSL connections */

        c->pool = ngx_create_pool(128, r->connection->log);
        if (c->pool == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    c->log = r->connection->log;
    c->pool->log = c->log;
    c->read->log = c->log;
    c->write->log = c->log;

    /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */

    u->writer.out = NULL;
    u->writer.last = &u->writer.out;
    u->writer.connection = c;
    u->writer.limit = 0;

    /*
     * 检查当前ngx_http_upstream_t 结构的request_sent标志位,
     * 若该标志位为1,则表示已经向上游服务器发送请求,即本次发起连接失败;
     * 则调用ngx_http_upstream_reinit方法重新向上游服务器发起连接;
     */
    if (u->request_sent) {
        if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    if (r->request_body
        && r->request_body->buf
        && r->request_body->temp_file
        && r == r->main)
    {
        /*
         * the r->request_body->buf can be reused for one request only,
         * the subrequests should allocate their own temporary bufs
         */

        u->output.free = ngx_alloc_chain_link(r->pool);
        if (u->output.free == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        u->output.free->buf = r->request_body->buf;
        u->output.free->next = NULL;
        u->output.allocated = 1;

        r->request_body->buf->pos = r->request_body->buf->start;
        r->request_body->buf->last = r->request_body->buf->start;
        r->request_body->buf->tag = u->output.tag;
    }

    u->request_sent = 0;

    /*
     * 若返回rc = NGX_AGAIN,表示没有收到上游服务器允许建立连接的应答;
     * 由于写事件已经添加到epoll事件机制中等待可写事件发生,
     * 所有在这里只需将当前连接的写事件添加到定时器机制中进行超时管理;
     * 并return从当前函数返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /*
     * 若返回值rc = NGX_OK,表示连接成功建立,
     * 调用此方法向上游服务器发送请求 */
    ngx_http_upstream_send_request(r, u);
}

发送请求

        当 Nginx 与上游服务器成功建立连接之后,会调用 ngx_http_upstream_send_request 方法发送请求,若是该方法不能一次性把请求内容发送完成时,则需等待 epoll 事件机制的写事件发生,若写事件发生,则会调用写事件 write_event_handler 的回调方法 ngx_http_upstream_send_request_handler 继续发送请求,并且有可能会多次调用该写事件的回调方法, 直到把请求发送完成。

下面是 ngx_http_upstream_send_request 方法的执行流程:

  • 检查 ngx_http_upstream_t 结构体中的标志位 request_sent 是否为 0,若为 0 表示未向上游发送请求。 且此时调用 ngx_http_upstream_test_connect 方法测试是否与上游建立连接,若返回非 NGX_OK, 则需调用 ngx_http_upstream_next 方法试图与上游建立连接,并return 从当前函数返回;
  • 调用 ngx_output_chain 方法向上游发送保存在 request_bufs 链表中的请求数据,该方法返回值为 rc,并设置 request_sent 标志位为 1,检查连接上写事件 timer_set 标志位是否为1,若为 1 调用ngx_del_timer 方法将写事件从定时器中移除;
  • 若 rc = NGX_ERROR,表示当前连接上出错,则调用 ngx_http_upstream_next 方法尝试再次与上游建立连接,并 return 从当前函数返回;
  • 若 rc = NGX_AGAIN,并是当前请求数据未完全发送,则需将剩余的请求数据保存在 ngx_http_upstream_t 结构体的 output 成员中,并且调用 ngx_add_timer 方法将当前连接上的写事件添加到定时器中,调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中,等待可写事件发生,并return 从当前函数返回;
  • 若 rc = NGX_OK,表示已经发送全部请求数据,则准备接收来自上游服务器的响应报文;
  • 先调用 ngx_add_timer 方法将当前连接的读事件添加到定时器机制中,检测接收响应是否超时,检查当前连接上的读事件是否准备就绪,即标志位 ready 是否为1,若该标志位为 1,则调用 ngx_http_upstream_process_header 方法开始处理响应头部,并 return 从当前函数返回;
  • 若当前连接上读事件的标志位 ready 为0,表示暂时无可读数据,则需等待读事件再次被触发,由于原始读事件的回调方法为 ngx_http_upstream_process_header,所有无需重新设置。由于请求已经全部发送,防止写事件的回调方法 ngx_http_upstream_send_request_handler 再次被触发,因此需要重新设置写事件的回调方法为 ngx_http_upstream_dummy_handler,该方法实际上不执行任何操作,同时调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中;
/* 向上游服务器发送请求 */
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    /* 获取当前连接 */
    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream send request");

    /*
     * 若标志位request_sent为0,表示还未发送请求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
     * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
     * 并return从当前函数返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    c->log->action = "sending request to upstream";

    /*
     * 调用ngx_output_chain方法向上游发送保存在request_bufs链表中的请求数据;
     * 值得注意的是该方法的第二个参数可以是NULL也可以是request_bufs,那怎么来区分呢?
     * 若是第一次调用该方法发送request_bufs链表中的请求数据时,request_sent标志位为0,
     * 此时,第二个参数自然就是request_bufs了,那么为什么会有NULL作为参数的情况呢?
     * 当在第一次调用该方法时,并不能一次性把所有request_bufs中的数据发送完毕时,
     * 此时,会把剩余的数据保存在output结构里面,并把标志位request_sent设置为1,
     * 因此,再次发送请求数据时,不用指定request_bufs参数,因为此时剩余数据已经保存在output中;
     */
    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);

    /* 向上游服务器发送请求之后,把request_sent标志位设置为1 */
    u->request_sent = 1;

    /* 下面根据不同rc的返回值进行判断 */

    /*
     * 若返回值rc=NGX_ERROR,表示当前连接上出错,
     * 将错误信息传递给ngx_http_upstream_next方法,
     * 该方法根据错误信息决定是否重新向上游服务器发起连接;
     * 并return从当前函数返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 检查当前连接上写事件的标志位timer_set是否为1,
     * 若该标志位为1,则需把写事件从定时器机制中移除;
     */
    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }

    /*
     * 若返回值rc = NGX_AGAIN,表示请求数据并未完全发送,
     * 即有剩余的请求数据保存在output中,但此时,写事件已经不可写,
     * 则调用ngx_add_timer方法把当前连接上的写事件添加到定时器机制,
     * 并调用ngx_handle_write_event方法将写事件注册到epoll事件机制中;
     * 并return从当前函数返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);

        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示已经发送完全部请求数据,
     * 准备接收来自上游服务器的响应报文,则执行以下程序;
     */
    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }

    /* 将当前连接上读事件添加到定时器机制中 */
    ngx_add_timer(c->read, u->conf->read_timeout);

    /*
     * 若此时,读事件已经准备就绪,
     * 则调用ngx_http_upstream_process_header方法开始接收并处理响应头部;
     * 并return从当前函数返回;
     */
    if (c->read->ready) {
        ngx_http_upstream_process_header(r, u);
        return;
    }

    /*
     * 若当前读事件未准备就绪;
     * 则把写事件的回调方法设置为ngx_http_upstream_dumy_handler方法(不进行任何实际操作);
     * 并把写事件注册到epoll事件机制中;
     */
    u->write_event_handler = ngx_http_upstream_dummy_handler;

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
}

当无法一次性将请求内容全部发送完毕,则需等待 epoll 事件机制的写事件发生,一旦发生就会调用回调方法 ngx_http_upstream_send_request_handler。

ngx_http_upstream_send_request_handler 方法的执行流程如下所示:

  • 检查连接上写事件是否超时,即timedout 标志位是否为 1,若为 1 表示已经超时,则调用 ngx_http_upstream_next 方法重新向上游发起连接请求,并 return 从当前函数返回;
  • 若标志位 timedout 为0,即不超时,检查 header_sent 标志位是否为 1,表示已经接收到来自上游服务器的响应头部,则不需要再向上游发送请求,将写事件的回调方法设置为 ngx_http_upstream_dummy_handler,同时将写事件注册到 epoll 事件机制中,并return 从当前函数返回;
  • 若标志位 header_sent 为 0,则调用 ngx_http_upstream_send_request 方法向上游发送请求数据;
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream send request handler");

    /* 检查当前连接上写事件的超时标志位 */
    if (c->write->timedout) {
        /* 执行超时重连机制 */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /* 已经接收到上游服务器的响应头部,则不需要再向上游服务器发送请求数据 */
    if (u->header_sent) {
        /* 将写事件的回调方法设置为不进行任何实际操作的方法ngx_http_upstream_dumy_handler */
        u->write_event_handler = ngx_http_upstream_dummy_handler;

        /* 将写事件注册到epoll事件机制中,并return从当前函数返回 */
        (void) ngx_handle_write_event(c->write, 0);

        return;
    }

    /* 若没有接收来自上游服务器的响应头部,则需向上游服务器发送请求数据 */
    ngx_http_upstream_send_request(r, u);
}

接收响应

接收响应头部

当 Nginx 已经向上游发送请求,准备开始接收来自上游的响应头部,由方法 ngx_http_upstream_process_header 实现,该方法接收并解析响应头部。

ngx_http_upstream_process_header 方法的执行流程如下:

  • 检查上游连接上的读事件是否超时,若标志位 timedout 为 1,则表示超时,此时调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
  • 若标志位 timedout 为 0,接着检查 ngx_http_upstream_t 结构体中的标志位 request_sent,若该标志位为 0,表示未向上游发送请求,同时调用 ngx_http_upstream_test_connect 方法测试连接状态,若该方法返回值为非 NGX_OK,表示与上游已经断开连接,则调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
  • 检查 ngx_http_upstream_t 结构体中接收响应头部的 buffer 缓冲区是否有内存空间以便接收响应头部,若 buffer.start 为 NULL,表示该缓冲区为空,则需调用 ngx_palloc 方法分配内存,该内存大小 buffer_size 由 ngx_http_upstream_conf_t 配置结构体的 buffer_size 成员指定;
  • 调用 recv 方法开始接收来自上游服务器的响应头部,并根据该方法的返回值 n 进行判断:
    • 若 n = NGX_AGAIN,表示读事件未准备就绪,需要等待下次读事件被触发时继续接收响应头部,此时,调用 ngx_add_timer 方法将读事件添加到定时器中,同时调用 ngx_handle_read_event 方法将读事件注册到epoll 事件机制中,并 return 从当前函数返回;
    • 若 n = NGX_ERROR 或 n = 0,表示上游连接发生错误 或 上游服务器主动关闭连接,则调用 ngx_http_upstream_next 方法重新发起连接请求,并 return 从当前函数返回;
    • 若 n 大于 0,表示已经接收到响应头部,此时,调用 ngx_http_upstream_t 结构体中由 HTTP 模块实现的 process_header 方法解析响应头部,且返回 rc 值;
  • 若 rc = NGX_AGAIN,表示接收到的响应头部不完整,检查接收缓冲区 buffer 是否还有剩余的内存空间,若缓冲区没有剩余的内存空间,表示接收到的响应头部过大,此时调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;若缓冲区还有剩余的内存空间,则continue 继续接收响应头部;
  • 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的响应头部是非法的,则调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;
  • 若 rc = NGX_ERROR,表示连接出错,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
  • 若 rc = NGX_OK,表示已接收到完整的响应头部,则调用 ngx_http_upstream_process_headers 方法处理已解析的响应头部,该方法会将已解析出来的响应头部保存在 ngx_http_request_t 结构体中的 headers_out 成员;
  • 检查 ngx_http_request_t 结构体的 subrequest_in_memory 成员决定是否需要转发响应给下游服务器;
    • 若 subrequest_in_memory 为 0,表示需要转发响应给下游服务器,则调用 ngx_http_upstream_send_response 方法开始转发响应给下游服务器,并 return 从当前函数返回;
    • 若 subrequest_in_memory 为 1,表示不需要将响应转发给下游,此时检查 HTTP 模块是否定义了 ngx_http_upstream_t 结构体中的 input_filter 方法处理响应包体;
  • 若没有定义 input_filter 方法,则使用 upstream 机制默认方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
  • 若定义了自己的 input_filter 方法,则首先调用 input_filter_init 方法为处理响应包体做初始化工作;
  • 检查接收缓冲区 buffer 在解析完响应头部之后剩余的字符流,若有剩余的字符流,则表示已经预接收了响应包体,此时调用 input_filter 方法处理响应包体;
  • 设置 upstream 机制读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_body_in_memory,并调用该方法开始接收并解析响应包体;
/* 接收并解析响应头部 */
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process header");

    c->log->action = "reading response header from upstream";

    /* 检查当前连接上的读事件是否超时 */
    if (c->read->timedout) {
        /*
         * 若标志位timedout为1,表示读事件超时;
         * 则把超时错误传递给ngx_http_upstream_next方法,
         * 该方法根据允许的错误进行重连接策略;
         * 并return从当前函数返回;
         */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    /*
     * 若标志位request_sent为0,表示还未发送请求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
     * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
     * 并return从当前函数返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 检查ngx_http_upstream_t结构体中接收响应头部的buffer缓冲区;
     * 若接收缓冲区buffer未分配内存,则调用ngx_palloce方法分配内存,
     * 该内存的大小buffer_size由ngx_http_upstream_conf_t配置结构的buffer_size指定;
     */
    if (u->buffer.start == NULL) {
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
        if (u->buffer.start == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        /* 调整接收缓冲区buffer,准备接收响应头部 */
        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        /* 表示该缓冲区内存可被复用、数据可被改变 */
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;

        /* 初始化headers_in的成员headers链表 */
        if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

#if (NGX_HTTP_CACHE)

        if (r->cache) {
            u->buffer.pos += r->cache->header_start;
            u->buffer.last = u->buffer.pos;
        }
#endif
    }

    for ( ;; ) {

        /* 调用recv方法从当前连接上读取响应头部数据 */
        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        /* 下面根据 recv 方法不同返回值 n 进行判断 */

        /*
         * 若返回值 n = NGX_AGAIN,表示读事件未准备就绪,
         * 需等待下次读事件被触发时继续接收响应头部,
         * 即将读事件注册到epoll事件机制中,等待可读事件发生;
         * 并return从当前函数返回;
         */
        if (n == NGX_AGAIN) {
#if 0
            ngx_add_timer(rev, u->read_timeout);
#endif

            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }

        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }

        /*
         * 若返回值 n = NGX_ERROR 或 n = 0,则表示上游服务器已经主动关闭连接;
         * 此时,调用ngx_http_upstream_next方法决定是否重新发起连接;
         * 并return从当前函数返回;
         */
        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        /* 若返回值 n 大于 0,表示已经接收到响应头部 */
        u->buffer.last += n;

#if 0
        u->valid_header_in = 0;

        u->peer.cached = 0;
#endif

        /*
         * 调用ngx_http_upstream_t结构体中process_header方法开始解析响应头部;
         * 并根据该方法返回值进行不同的判断;
         */
        rc = u->process_header(r);

        /*
         * 若返回值 rc = NGX_AGAIN,表示接收到的响应头部不完整,
         * 需等待下次读事件被触发时继续接收响应头部;
         * continue继续接收响应;
         */
        if (rc == NGX_AGAIN) {

            if (u->buffer.last == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            continue;
        }

        break;
    }

    /*
     * 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
     * 则表示接收到的响应头部是非法的,
     * 调用ngx_http_upstream_next方法决定是否重新发起连接;
     * 并return从当前函数返回;
     */
    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    /*
     * 若返回值 rc = NGX_ERROR,表示出错,
     * 则调用ngx_http_upstream_finalize_request方法结束该请求;
     * 并return从当前函数返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示成功解析到完整的响应头部;*/
    if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {

        if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
            return;
        }

        if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
            return;
        }
    }

    /* 调用ngx_http_upstream_process_headers方法处理已解析处理的响应头部 */
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }

    /*
     * 检查ngx_http_request_t 结构体的subrequest_in_memory成员决定是否转发响应给下游服务器;
     * 若该标志位为0,则需调用ngx_http_upstream_send_response方法转发响应给下游服务器;
     * 并return从当前函数返回;
     */
    if (!r->subrequest_in_memory) {
        ngx_http_upstream_send_response(r, u);
        return;
    }

    /* 若不需要转发响应,则调用ngx_http_upstream_t中的input_filter方法处理响应包体 */
    /* subrequest content in memory */

    /*
     * 若HTTP模块没有定义ngx_http_upstream_t中的input_filter处理方法;
     * 则使用upstream机制默认方法ngx_http_upstream_non_buffered_filter;
     *
     * 若HTTP模块实现了input_filter方法,则不使用upstream默认的方法;
     */
    if (u->input_filter == NULL) {
        u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
        u->input_filter = ngx_http_upstream_non_buffered_filter;
        u->input_filter_ctx = r;
    }

    /*
     * 调用input_filter_init方法为处理包体做初始化工作;
     */
    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    /*
     * 检查接收缓冲区是否有剩余的响应数据;
     * 因为响应头部已经解析完毕,若接收缓冲区还有未被解析的剩余数据,
     * 则该数据就是响应包体;
     */
    n = u->buffer.last - u->buffer.pos;

    /*
     * 若接收缓冲区有剩余的响应包体,调用input_filter方法开始处理已接收到响应包体;
     */
    if (n) {
        u->buffer.last = u->buffer.pos;

        u->state->response_length += n;

        /* 调用input_filter方法处理响应包体 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /* 设置upstream机制的读事件回调方法read_event_handler为ngx_http_upstream_process_body_in_memory */
    u->read_event_handler = ngx_http_upstream_process_body_in_memory;

    /* 调用ngx_http_upstream_process_body_in_memory方法开始处理响应包体 */
    ngx_http_upstream_process_body_in_memory(r, u);
}

接收响应包体

接收并解析响应包体由 ngx_http_upstream_process_body_in_memory 方法实现;

ngx_http_upstream_process_body_in_memory 方法的执行流程如下所示:

  • 检查上游连接上读事件是否超时,若标志位 timedout 为 1,则表示已经超时,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
  • 检查接收缓冲区 buffer 是否还有剩余的内存空间,若没有剩余的内存空间,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;若有剩余的内存空间则调用 recv 方法开始接收响应包体;
    • 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体,调用 ngx_handle_read_event 方法将读事件注册到 epoll 事件机制中,同时将读事件添加到定时器机制中;
    • 若返回值 n = 0 或 n = NGX_ERROR,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
    • 若返回值 n 大于 0,则表示成功接收到响应包体,调用 input_filter 方法开始处理响应包体,检查读事件的 ready 标志位;
  • 若标志位 ready 为 1,表示仍有可读的响应包体数据,因此回到步骤 2 继续调用 recv 方法读取响应包体,直到读取完毕;
  • 若标志位 ready 为 0,则调用 ngx_handle_read_event 方法将读事件注册到epoll事件机制中,同时调用 ngx_add_timer 方法将读事件添加到定时器机制中;
/* 接收并解析响应包体 */
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    size_t             size;
    ssize_t            n;
    ngx_buf_t         *b;
    ngx_event_t       *rev;
    ngx_connection_t  *c;

    c = u->peer.connection;
    rev = c->read;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process body on memory");

    /*
     * 检查读事件标志位timedout是否超时,若该标志位为1,表示响应已经超时;
     * 则调用ngx_http_upstream_finalize_request方法结束请求;
     * 并return从当前函数返回;
     */
    if (rev->timedout) {
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
        return;
    }

    b = &u->buffer;

    for ( ;; ) {

        /* 检查当前接收缓冲区是否剩余的内存空间 */
        size = b->end - b->last;

        /*
         * 若接收缓冲区不存在空闲的内存空间,
         * 则调用ngx_http_upstream_finalize_request方法结束请求;
         * 并return从当前函数返回;
         */
        if (size == 0) {
            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                          "upstream buffer is too small to read response");
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /*
         * 若接收缓冲区有可用的内存空间,
         * 则调用recv方法开始接收响应包体;
         */
        n = c->recv(c, b->last, size);

        /*
         * 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体;
         */
        if (n == NGX_AGAIN) {
            break;
        }

        /*
         * 若返回值n = 0(表示上游服务器主动关闭连接),或n = NGX_ERROR(表示出错);
         * 则调用ngx_http_upstream_finalize_request方法结束请求;
         * 并return从当前函数返回;
         */
        if (n == 0 || n == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, n);
            return;
        }

        /* 若返回值 n 大于0,表示成功读取到响应包体 */
        u->state->response_length += n;

        /* 调用input_filter方法处理本次接收到的响应包体 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /* 检查读事件的ready标志位,若为1,继续读取响应包体 */
        if (!rev->ready) {
            break;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /*
     * 若读事件的ready标志位为0,表示读事件未准备就绪,
     * 则将读事件注册到epoll事件机制中,添加到定时器机制中;
     * 读事件的回调方法不改变,即依旧为ngx_http_upstream_process_body_in_memory;
     */
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    if (rev->active) {
        ngx_add_timer(rev, u->conf->read_timeout);

    } else if (rev->timer_set) {
        ngx_del_timer(rev);
    }
}

转发响应

下面看下 upstream 处理上游响应包体的三种方式:

  1. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 1 时,upstream 不转发响应包体到下游,并由HTTP 模块实现的input_filter() 方法处理包体;
  2. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 1 时,upstream 将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
  3. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 0 时,upstream 将使用固定大小的缓冲区来转发响应包体;

转发响应由函数 ngx_http_upstream_send_response 实现,该函数的执行流程如下:

  • 呼叫ngx_http_send_header 方法轉送回應頭部,並將ngx_http_upstream_t 結構體中的header_sent 標誌位元設定為1,表示已經轉送回應頭部;
  • _run檔還保存著要求包體方法清理臨時檔案;
  • 檢查標誌位buffering,若該標誌位為1,表示需要開啟檔案緩存,若該標誌位元為0,則不需要開啟檔案緩存,只需要以固定的記憶體區塊大小轉發回應包體即可;
  • 若標誌位buffering 為0;
    • 則檢查HTTP 模組是否實作了自己的input_filter 方法,若沒有則使用upstream 機制預設的方法ngx_http_com結構體中讀取事件read_event_handler 的回呼方法為ngx_http_upstream_process_non_buffered_upstream,接收上游回應時,會透過ngx_http_upstream_handler 方法最終呼叫ngx_http_upstream_process_non_buffered_uptream 來接收「 _handler 的回呼方法為ngx_http_upstream_process_non_buffered_downstream,當向下游發送資料時,會透過ngx_http_handler 方法最終呼叫ngx_http_upstream_process_non_buffered_downstream 方法來傳送回應包體;
    • 呼叫input_filter_init 方法為input_filter 回應處理方法回應處理方法。部之後,是否還有剩餘的響應數據,若有表示預接收了響應包體:
    • 若在解析響應頭部區間,預接收了響應包體,則調用input_filter 方法處理該部分預接收的回應包體,並呼叫ngx_http_upstream_process_non_buffered_downstream 方法轉發本次接收到的回應包體給下游伺服器;
    • 若在解析回應頭部區間,沒有接收回應包體,則先清空接收緩衝區來接收回應包體,檢查上游連線上讀取事件是否已準備就緒,若標誌位元ready 為1,表示準備就緒,則呼叫ngx_http_upstream_process_non_buffered_upstream 方法接收上游回應包體;若標誌位元ready 為0,則return 從目前函數返回;
      • 若標誌位buffering 為1;
      • 初始化ngx_http_upstream_t 結構體中的ngx_event_pipe_c處理響應包體做初始化工作;
    • 設定上游連線上的讀取事件read_event_handler 的回呼方法為ngx_http_upstream_process_upstream;
  • 設定上游連線上的寫事件write_event_handler 的回呼方法為ngcstream_up_process_pstream_processcom由上游伺服器發送的回應包體;
    • /* 轉發回應包體*/ static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { int tcp_nodelay; ssize_t n; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; /* 呼叫ngx_http_send_hander方法向下游發送回應頭 */ rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); return; } /* 將標誌位header_sent設定為1 */ u->header_sent = 1; if (u->upgrade) { ngx_http_upstream_upgrade(r, u); return; } /* 取得Nginx與下游之間的TCP連線 */ c = r->connection; if (r->header_only) { if (u->cacheable || u->store) { if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) { ngx_connection_error(c, ngx_socket_errno, ngx_shutdown_socket_n " failed"); } r->read_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler; c->error = 1; } else { ngx_http_upstream_finalize_request(r, u, rc); return; } } /* 若暫存檔案儲存請求包體,則呼叫ngx_pool_run_cleanup_file方法清理暫存檔案的請求包體 */ if (r->request_body && r->request_body->temp_file) { ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; }clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* * 若標誌位元buffering為0,轉發回應時以下游伺服器網速優先; * 即只需分配固定的記憶體區塊大小來接收來自上游伺服器的回應並轉發, * 當該記憶體區塊已滿,則暫停接收來自上游伺服器的回應數據, * 等待把記憶體區塊的回應資料轉發給下游伺服器後有剩餘記憶體空間再繼續接收回應; */ if (!u->buffering) { /* * 若HTTP模組沒有實作input_filter方法, * 則採用upstream機制預設的方法ngx_http_upstream_non_buffered_filter; */ if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } /* * 設定ngx_http_upstream_t結構體中讀取事件的回呼方法為ngx_http_upstream_non_buffered_upstream,(即讀取上游回應的方法); * 設定目前請求ngx_http_request_t結構體中寫事件的回呼方法為ngx_http_upstream_process_non_buffered_downstream,(即轉送回應到下游的方法); */ u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; /* 呼叫input_filter_init為input_filter方法處理回應包體做初始化工作 */ if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) failed"); ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } c->tcp_nodelay = NGX_TCP_NODELAY_SET; } /* 檢查解析完回應頭部後接收緩衝區buffer是否已接收了回應包體 */ n = u->buffer.last - u->buffer.pos; /* 若接收緩衝區已經接收了回應包體 */ if (n) { u->buffer.last = u->buffer.pos; u->state->response_length += n; /* 呼叫input_filter方法開始處理回應包體 */ if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } /* 呼叫此方法把本次接收到的回應包體轉送給下游伺服器 */ ngx_http_upstream_process_non_buffered_downstream(r); } else { /* 若接收緩衝區中沒有回應包體,則將其清空,即重複使用這個緩衝區 */ u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } /* * 若目前連線上讀事件已準備就緒, * 則呼叫ngx_http_upstream_process_non_buffered_upstream方法接收回應包體並處理; */ if (u->peer.connection->read->ready || u->length == 0) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } return; }/* * 若ngx_http_upstream_t結構體的buffering標誌位元為1,則轉送回應包體時以上游網速優先; * 即分配更多的記憶體和緩存,即一直接收來自上游伺服器的回應,把來自上游伺服器的回應保存的記憶體或快取中; */ /* TODO: preallocate event_pipe bufs, look "Content-Length" */ #if (NGX_HTTP_CACHE) …… …… #endif /* 初始化ngx_event_pipe_t結構體 p */ p = u->pipe; p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; p->bufs = u->conf->bufs; p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->downstream = c; p->pool = r->pool; p->log = c->log; p->cacheable = u->cacheable || u->store; p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file.log = c->log; p->temp_file->path = u->conf->temp_path; p->temp_file->pool = r->pool; if (p->cacheable) { p->temp_file->persistent = 1; } else { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "an upstream response is buffered " "to a temporary file"; } p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; /* 初始化預讀鍊錶緩衝區preread_bufs */ p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->preread_bufs->buf = &u->buffer; p->preread_bufs->next = NULL; u->buffer.recycled = 1; p->preread_size = u->buffer.last - u->buffer.pos; if (u->cacheable) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->buf_to_file->start = u->buffer.start; p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->temporary = 1; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { /* the posted aio operation may corrupt a shadow buffer */ p->single_buf = 1; } /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */ p->free_bufs = 1; /* * event_pipe would do u->b
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn