>  기사  >  백엔드 개발  >  Nginx에서 업스트림 메커니즘 구현

Nginx에서 업스트림 메커니즘 구현

WBOY
WBOY원래의
2016-08-08 09:30:042063검색

개요

Nginx는 Nginx를 역방향 프록시 서버로 만들고, 다운스트림 클라이언트로부터 요청을 처리하며, 동시에 다음을 기반으로 업스트림으로 흐릅니다. 서버는 TCP 요청 메시지를 보내고, 업스트림 서버는 요청에 따라 해당 응답 메시지를 반환합니다. Nginx는 업스트림 서버의 응답 메시지를 기반으로 응답 메시지를 다운스트림 클라이언트에 전달할지 여부를 결정합니다. 또한 업스트림 메커니즘은 클러스터 서버의 서버에 대한 요청의 로드 밸런싱을 수행할 수 있는 로드 밸런싱 기능을 제공합니다.

업스트림 시작

업스트림 메커니즘을 시작하려면 Nginx에서 ngx_http_upstream_init 메서드를 호출하세요. 하지만 업스트림 메커니즘을 사용하기 전에 ngx_http_upstream_create를 호출해야 합니다. ngx_http_upstream_t 구조를 생성하는 메서드는 기본적으로 ngx_http_request_t 구조의 업스트림 멤버가 NULL을 가리키기 때문에 구조의 특정 초기화 작업은 여전히 ​​HTTP에서 수행되어야 합니다. 모듈이 완료되었습니다. ngx_http_upstream_t 구조 및 ngx_http_upstream_conf_t 구조에 대한 관련 설명은 "Nginx의 업스트림 메커니즘" 문서를 참조하세요.

 다음은 ngx_http_upstream_create 함수의 구현입니다.

/* 创建 ngx_http_upstream_t 结构体 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
    ngx_http_upstream_t  *u;

    u = r->upstream;

    /*
     * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
     * 则调用cleanup清理方法将原始结构体清除;
     */
    if (u && u->cleanup) {
        r->main->count++;
        ngx_http_upstream_cleanup(r);
    }

    /* 从内存池分配ngx_http_upstream_t 结构体空间 */
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_ERROR;
    }

    /* 给ngx_http_request_t 结构体成员upstream赋值 */
    r->upstream = u;

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

#if (NGX_HTTP_CACHE)
    r->cache = NULL;
#endif

    u->headers_in.content_length_n = -1;

    return NGX_OK;
}

업스트림 메커니즘 시작 방법 ngx_http_upstream_init의 실행 흐름 정보는 다음과 같습니다.

  • Nginx와 다운스트림 서버 간의 연결에서 읽기 이벤트가 즉, 타이머에서, 즉, 타이머_세트를 확인하십시오. 플래그 비트가 1인지 여부, 플래그 비트가 1이면 타이머에서 읽기 이벤트를 제거하십시오.
  • ngx_http_upstream_init_request 메소드를 호출하여 시작하십시오. 업스트림 메커니즘

ngx_http_upstream_init_request 메소드의 실행 흐름은 다음과 같습니다.

  • ngx_http_upstream_t 구조의 store 플래그가 0인지 확인하세요. ngx_http_request_t 구조의 post_action 플래그 비트가 0인지 확인하세요. ngx_http_upstream_conf_t 구조의ignore_client_abort가 0인지 확인하세요. 위 플래그 비트가 모두 0이면 콜백 메서드를 설정하세요. ngx_http_request_t에서 ngx_http_upstream_rd_check_broken_connection으로 요청한 읽기 이벤트 쓰기 이벤트를 설정합니다. 콜백 방법은 다음과 같습니다. ngx_http_upstream_wr_check_broken_connection; 두 메소드 모두 ngx_http_upstream_check_broken_connection 메소드를 호출하여 Nginx와 다운스트림 간의 연결이 정상인지 확인합니다. 오류가 발생하면 연결이 종료됩니다.
  • 충족되지 않습니다. 즉, 하나라도 0이 아닌 경우 요청의 ngx_http_upstream_t 구조에서 HTTP 모듈로 구현된 create_request 메서드를 호출하여 업스트림 서버에 대한 요청을 구성합니다. >원래 요청에 ngx_http_cleanup_add 메소드를 호출합니다. 콜백 핸들러 메소드가 정리 연결 목록 끝에 추가됩니다. 콜백 메소드는 현재 요청이 종료되면 일부 정리 작업을 수행하기 위해 호출됩니다. 🎜>
  • ngx_http_upstream_connect 메소드를 호출하여 업스트림 서버에 대한 요청을 시작합니다.
  • 연결 설정
/* 初始化启动upstream机制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
    ngx_connection_t     *c;

    /* 获取当前请求所对应的连接 */
    c = r->connection;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http init upstream, client timer: %d", c->read->timer_set);

#if (NGX_HTTP_SPDY)
    if (r->spdy_stream) {
        ngx_http_upstream_init_request(r);
        return;
    }
#endif

    /*
     * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
     * 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
     * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
     */
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        if (!c->write->active) {
            if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    }

    ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
    ngx_str_t                      *host;
    ngx_uint_t                      i;
    ngx_resolver_ctx_t             *ctx, temp;
    ngx_http_cleanup_t             *cln;
    ngx_http_upstream_t            *u;
    ngx_http_core_loc_conf_t       *clcf;
    ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_http_upstream_main_conf_t  *umcf;

    if (r->aio) {
        return;
    }

    u = r->upstream;

#if (NGX_HTTP_CACHE)
    ...
    ...
#endif

    /* 文件缓存标志位 */
    u->store = (u->conf->store || u->conf->store_lengths);

    /*
     * 检查ngx_http_upstream_t 结构中标志位 store;
     * 检查ngx_http_request_t 结构中标志位 post_action;
     * 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort;
     * 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开;
     */
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }

    /* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */
    if (r->request_body) {
        u->request_bufs = r->request_body->bufs;
    }

    /* 调用create_request方法构造发往上游服务器的请求 */
    if (u->create_request(r) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */
    u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);

    /* 获取ngx_http_core_module模块的loc级别的配置项结构 */
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
    u->output.alignment = clcf->directio_alignment;
    u->output.pool = r->pool;
    u->output.bufs.num = 1;
    u->output.bufs.size = clcf->client_body_buffer_size;
    u->output.output_filter = ngx_chain_writer;
    u->output.filter_ctx = &u->writer;

    u->writer.pool = r->pool;

    /* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
    if (r->upstream_states == NULL) {

        r->upstream_states = ngx_array_create(r->pool, 1,
                                            sizeof(ngx_http_upstream_state_t));
        if (r->upstream_states == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

    } else {

        u->state = ngx_array_push(r->upstream_states);
        if (u->state == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
    }

    /*
     * 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法,
     * 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
     */
    cln = ngx_http_cleanup_add(r, 0);
    if (cln == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    cln->handler = ngx_http_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;

    if (u->resolved == NULL) {

        /* 若没有实现u->resolved标志位,则定义上游服务器的配置 */
        uscf = u->conf->upstream;

    } else {

        /*
         * 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址;
         */


        /*
         * 若已经指定了上游服务器地址,则不需要解析,
         * 直接调用ngx_http_upstream_connection方法向上游服务器发起连接;
         * 并return从当前函数返回;
         */
        if (u->resolved->sockaddr) {

            if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
                != NGX_OK)
            {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            ngx_http_upstream_connect(r, u);

            return;
        }

        /*
         * 若还没指定上游服务器的地址,则需解析主机域名;
         * 若成功解析出上游服务器的地址和端口号,
         * 则调用ngx_http_upstream_connection方法向上游服务器发起连接;
         */
        host = &u->resolved->host;

        umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

        uscfp = umcf->upstreams.elts;

        for (i = 0; i < umcf->upstreams.nelts; i++) {

            uscf = uscfp[i];

            if (uscf->host.len == host->len
                && ((uscf->port == 0 && u->resolved->no_port)
                     || uscf->port == u->resolved->port)
                && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
            {
                goto found;
            }
        }

        if (u->resolved->port == 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no port in upstream \"%V\"", host);
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        temp.name = *host;

        ctx = ngx_resolve_start(clcf->resolver, &temp);
        if (ctx == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ctx == NGX_NO_RESOLVER) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no resolver defined to resolve %V", host);

            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
            return;
        }

        ctx->name = *host;
        ctx->handler = ngx_http_upstream_resolve_handler;
        ctx->data = r;
        ctx->timeout = clcf->resolver_timeout;

        u->resolved->ctx = ctx;

        if (ngx_resolve_name(ctx) != NGX_OK) {
            u->resolved->ctx = NULL;
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

found:

    if (uscf == NULL) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "no upstream configuration");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if (uscf->peer.init(r, uscf) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_http_upstream_connect(r, u);
}

static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->read);
}


static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->write);
}

업스트림 메커니즘은 업스트림 서버와 TCP 연결을 설정할 때 사용됩니다. 즉, 연결 요청을 시작한 후 즉시 반환됩니다. 연결이 성공적으로 설정되었는지 여부를 확인하고 즉시 성공적으로 설정되지 않은 경우 epoll 이벤트 메커니즘에서 소켓을 모니터링해야 합니다. 업스트림 서버에 대한 연결 요청 시작은 ngx_http_upstream_connect 함수로 구현됩니다. ngx_http_upstream_connect 분석 중 메소드 이전에 ngx_event_connect_peer 메소드를 먼저 분석하세요. 이 메소드는

ngx_http_upstream_connect 메소드

에 의해 호출되기 때문입니다. ngx_event_connect_peer 메소드의 실행 흐름은 다음과 같습니다.

ngx_socket 메소드를 호출하여 생성합니다. TCP 소켓 문자

  • TCP 소켓을 비차단 모드로 설정하려면 ngx_nonblocking 메서드를 호출하세요.
  • 수신할 소켓 연결을 설정하세요. 네트워크 문자 전송
  • 소켓 연결에서 읽기 및 쓰기 이벤트 방법을 설정합니다.
  • EPOLLIN을 예상하도록 TCP 소켓을 설정합니다. | EPOLLOUT 이벤트 메소드가 epoll 이벤트 메커니즘에 추가되었습니다.
  • 서버에 대한 TCP 연결 요청을 시작하기 위해 연결 메소드를 호출합니다. > ngx_http_upstream_connect 메소드는 업스트림 서버에 연결 요청을 시작함을 나타냅니다. 실행 프로세스는 다음과 같습니다.
    • 调用 ngx_event_connect_peer 方法主动向上游服务器发起连接请求,需要注意的是该方法已经将相应的套接字注册到epoll事件机制来监听读、写事件,该方法返回值为 rc;
      • 若 rc = NGX_ERROR,表示发起连接失败,则调用ngx_http_upstream_finalize_request 方法关闭连接请求,并 return 从当前函数返回;
      • 若 rc = NGX_BUSY,表示当前上游服务器处于不活跃状态,则调用 ngx_http_upstream_next 方法根据传入的参数尝试重新发起连接请求,并 return 从当前函数返回;
      • 若 rc = NGX_DECLINED,表示当前上游服务器负载过重,则调用 ngx_http_upstream_next 方法尝试与其他上游服务器建立连接,并 return 从当前函数返回;
      • 设置上游连接 ngx_connection_t 结构体的读事件、写事件的回调方法 handler 都为 ngx_http_upstream_handler,设置 ngx_http_upstream_t 结构体的写事件 write_event_handler 的回调为 ngx_http_upstream_send_request_handler,读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_header;
      • 若 rc = NGX_AGAIN,表示当前已经发起连接,但是没有收到上游服务器的确认应答报文,即上游连接的写事件不可写,则需调用 ngx_add_timer 方法将上游连接的写事件添加到定时器中,管理超时确认应答;
      • 若 rc = NGX_OK,表示成功建立连接,则调用 ngx_http_upsream_send_request 方法向上游服务器发送请求;
    /* 向上游服务器建立连接 */
    static void
    ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
    {
        ngx_int_t          rc;
        ngx_time_t        *tp;
        ngx_connection_t  *c;
    
        r->connection->log->action = "connecting to upstream";
    
        if (u->state && u->state->response_sec) {
            tp = ngx_timeofday();
            u->state->response_sec = tp->sec - u->state->response_sec;
            u->state->response_msec = tp->msec - u->state->response_msec;
        }
    
        u->state = ngx_array_push(r->upstream_states);
        if (u->state == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
    
        tp = ngx_timeofday();
        u->state->response_sec = tp->sec;
        u->state->response_msec = tp->msec;
    
        /* 向上游服务器发起连接 */
        rc = ngx_event_connect_peer(&u->peer);
    
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http upstream connect: %i", rc);
    
        /* 下面根据rc不同返回值进行分析 */
    
        /* 若建立连接失败,则关闭当前请求,并return从当前函数返回 */
        if (rc == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        u->state->peer = u->peer.name;
    
        /*
         * 若返回rc = NGX_BUSY,表示当前上游服务器不活跃,
         * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
         * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
         * 并return从当前函数返回;
         */
        if (rc == NGX_BUSY) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
            return;
        }
    
        /*
         * 若返回rc = NGX_DECLINED,表示当前上游服务器负载过重,
         * 则调用ngx_http_upstream_next向上游服务器重新发起连接,
         * 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
         * 并return从当前函数返回;
         */
        if (rc == NGX_DECLINED) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }
    
        /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
    
        c = u->peer.connection;
    
        c->data = r;
    
        /* 设置当前连接ngx_connection_t 上读、写事件的回调方法 */
        c->write->handler = ngx_http_upstream_handler;
        c->read->handler = ngx_http_upstream_handler;
    
        /* 设置upstream机制的读、写事件的回调方法 */
        u->write_event_handler = ngx_http_upstream_send_request_handler;
        u->read_event_handler = ngx_http_upstream_process_header;
    
        c->sendfile &= r->connection->sendfile;
        u->output.sendfile = c->sendfile;
    
        if (c->pool == NULL) {
    
            /* we need separate pool here to be able to cache SSL connections */
    
            c->pool = ngx_create_pool(128, r->connection->log);
            if (c->pool == NULL) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    
        c->log = r->connection->log;
        c->pool->log = c->log;
        c->read->log = c->log;
        c->write->log = c->log;
    
        /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
    
        u->writer.out = NULL;
        u->writer.last = &u->writer.out;
        u->writer.connection = c;
        u->writer.limit = 0;
    
        /*
         * 检查当前ngx_http_upstream_t 结构的request_sent标志位,
         * 若该标志位为1,则表示已经向上游服务器发送请求,即本次发起连接失败;
         * 则调用ngx_http_upstream_reinit方法重新向上游服务器发起连接;
         */
        if (u->request_sent) {
            if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    
        if (r->request_body
            && r->request_body->buf
            && r->request_body->temp_file
            && r == r->main)
        {
            /*
             * the r->request_body->buf can be reused for one request only,
             * the subrequests should allocate their own temporary bufs
             */
    
            u->output.free = ngx_alloc_chain_link(r->pool);
            if (u->output.free == NULL) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            u->output.free->buf = r->request_body->buf;
            u->output.free->next = NULL;
            u->output.allocated = 1;
    
            r->request_body->buf->pos = r->request_body->buf->start;
            r->request_body->buf->last = r->request_body->buf->start;
            r->request_body->buf->tag = u->output.tag;
        }
    
        u->request_sent = 0;
    
        /*
         * 若返回rc = NGX_AGAIN,表示没有收到上游服务器允许建立连接的应答;
         * 由于写事件已经添加到epoll事件机制中等待可写事件发生,
         * 所有在这里只需将当前连接的写事件添加到定时器机制中进行超时管理;
         * 并return从当前函数返回;
         */
        if (rc == NGX_AGAIN) {
            ngx_add_timer(c->write, u->conf->connect_timeout);
            return;
        }
    
    #if (NGX_HTTP_SSL)
    
        if (u->ssl && c->ssl == NULL) {
            ngx_http_upstream_ssl_init_connection(r, u, c);
            return;
        }
    
    #endif
    
        /*
         * 若返回值rc = NGX_OK,表示连接成功建立,
         * 调用此方法向上游服务器发送请求 */
        ngx_http_upstream_send_request(r, u);
    }

    发送请求

            当 Nginx 与上游服务器成功建立连接之后,会调用 ngx_http_upstream_send_request 方法发送请求,若是该方法不能一次性把请求内容发送完成时,则需等待 epoll 事件机制的写事件发生,若写事件发生,则会调用写事件 write_event_handler 的回调方法 ngx_http_upstream_send_request_handler 继续发送请求,并且有可能会多次调用该写事件的回调方法, 直到把请求发送完成。

    下面是 ngx_http_upstream_send_request 方法的执行流程:

    • 检查 ngx_http_upstream_t 结构体中的标志位 request_sent 是否为 0,若为 0 表示未向上游发送请求。 且此时调用 ngx_http_upstream_test_connect 方法测试是否与上游建立连接,若返回非 NGX_OK, 则需调用 ngx_http_upstream_next 方法试图与上游建立连接,并return 从当前函数返回;
    • 调用 ngx_output_chain 方法向上游发送保存在 request_bufs 链表中的请求数据,该方法返回值为 rc,并设置 request_sent 标志位为 1,检查连接上写事件 timer_set 标志位是否为1,若为 1 调用ngx_del_timer 方法将写事件从定时器中移除;
    • 若 rc = NGX_ERROR,表示当前连接上出错,则调用 ngx_http_upstream_next 方法尝试再次与上游建立连接,并 return 从当前函数返回;
    • 若 rc = NGX_AGAIN,并是当前请求数据未完全发送,则需将剩余的请求数据保存在 ngx_http_upstream_t 结构体的 output 成员中,并且调用 ngx_add_timer 方法将当前连接上的写事件添加到定时器中,调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中,等待可写事件发生,并return 从当前函数返回;
    • 若 rc = NGX_OK,表示已经发送全部请求数据,则准备接收来自上游服务器的响应报文;
    • 先调用 ngx_add_timer 方法将当前连接的读事件添加到定时器机制中,检测接收响应是否超时,检查当前连接上的读事件是否准备就绪,即标志位 ready 是否为1,若该标志位为 1,则调用 ngx_http_upstream_process_header 方法开始处理响应头部,并 return 从当前函数返回;
    • 若当前连接上读事件的标志位 ready 为0,表示暂时无可读数据,则需等待读事件再次被触发,由于原始读事件的回调方法为 ngx_http_upstream_process_header,所有无需重新设置。由于请求已经全部发送,防止写事件的回调方法 ngx_http_upstream_send_request_handler 再次被触发,因此需要重新设置写事件的回调方法为 ngx_http_upstream_dummy_handler,该方法实际上不执行任何操作,同时调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中;
    /* 向上游服务器发送请求 */
    static void
    ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
    {
        ngx_int_t          rc;
        ngx_connection_t  *c;
    
        /* 获取当前连接 */
        c = u->peer.connection;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http upstream send request");
    
        /*
         * 若标志位request_sent为0,表示还未发送请求;
         * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
         * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
         * 并return从当前函数返回;
         */
        if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }
    
        c->log->action = "sending request to upstream";
    
        /*
         * 调用ngx_output_chain方法向上游发送保存在request_bufs链表中的请求数据;
         * 值得注意的是该方法的第二个参数可以是NULL也可以是request_bufs,那怎么来区分呢?
         * 若是第一次调用该方法发送request_bufs链表中的请求数据时,request_sent标志位为0,
         * 此时,第二个参数自然就是request_bufs了,那么为什么会有NULL作为参数的情况呢?
         * 当在第一次调用该方法时,并不能一次性把所有request_bufs中的数据发送完毕时,
         * 此时,会把剩余的数据保存在output结构里面,并把标志位request_sent设置为1,
         * 因此,再次发送请求数据时,不用指定request_bufs参数,因为此时剩余数据已经保存在output中;
         */
        rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
    
        /* 向上游服务器发送请求之后,把request_sent标志位设置为1 */
        u->request_sent = 1;
    
        /* 下面根据不同rc的返回值进行判断 */
    
        /*
         * 若返回值rc=NGX_ERROR,表示当前连接上出错,
         * 将错误信息传递给ngx_http_upstream_next方法,
         * 该方法根据错误信息决定是否重新向上游服务器发起连接;
         * 并return从当前函数返回;
         */
        if (rc == NGX_ERROR) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }
    
        /*
         * 检查当前连接上写事件的标志位timer_set是否为1,
         * 若该标志位为1,则需把写事件从定时器机制中移除;
         */
        if (c->write->timer_set) {
            ngx_del_timer(c->write);
        }
    
        /*
         * 若返回值rc = NGX_AGAIN,表示请求数据并未完全发送,
         * 即有剩余的请求数据保存在output中,但此时,写事件已经不可写,
         * 则调用ngx_add_timer方法把当前连接上的写事件添加到定时器机制,
         * 并调用ngx_handle_write_event方法将写事件注册到epoll事件机制中;
         * 并return从当前函数返回;
         */
        if (rc == NGX_AGAIN) {
            ngx_add_timer(c->write, u->conf->send_timeout);
    
            if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            return;
        }
    
        /* rc == NGX_OK */
    
        /*
         * 若返回值 rc = NGX_OK,表示已经发送完全部请求数据,
         * 准备接收来自上游服务器的响应报文,则执行以下程序;
         */
        if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
            if (ngx_tcp_push(c->fd) == NGX_ERROR) {
                ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                              ngx_tcp_push_n " failed");
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
        }
    
        /* 将当前连接上读事件添加到定时器机制中 */
        ngx_add_timer(c->read, u->conf->read_timeout);
    
        /*
         * 若此时,读事件已经准备就绪,
         * 则调用ngx_http_upstream_process_header方法开始接收并处理响应头部;
         * 并return从当前函数返回;
         */
        if (c->read->ready) {
            ngx_http_upstream_process_header(r, u);
            return;
        }
    
        /*
         * 若当前读事件未准备就绪;
         * 则把写事件的回调方法设置为ngx_http_upstream_dumy_handler方法(不进行任何实际操作);
         * 并把写事件注册到epoll事件机制中;
         */
        u->write_event_handler = ngx_http_upstream_dummy_handler;
    
        if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    当无法一次性将请求内容全部发送完毕,则需等待 epoll 事件机制的写事件发生,一旦发生就会调用回调方法 ngx_http_upstream_send_request_handler。

    ngx_http_upstream_send_request_handler 方法的执行流程如下所示:

    • 检查连接上写事件是否超时,即timedout 标志位是否为 1,若为 1 表示已经超时,则调用 ngx_http_upstream_next 方法重新向上游发起连接请求,并 return 从当前函数返回;
    • 若标志位 timedout 为0,即不超时,检查 header_sent 标志位是否为 1,表示已经接收到来自上游服务器的响应头部,则不需要再向上游发送请求,将写事件的回调方法设置为 ngx_http_upstream_dummy_handler,同时将写事件注册到 epoll 事件机制中,并return 从当前函数返回;
    • 若标志位 header_sent 为 0,则调用 ngx_http_upstream_send_request 方法向上游发送请求数据;
    static void
    ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
        ngx_http_upstream_t *u)
    {
        ngx_connection_t  *c;
    
        c = u->peer.connection;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http upstream send request handler");
    
        /* 检查当前连接上写事件的超时标志位 */
        if (c->write->timedout) {
            /* 执行超时重连机制 */
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
            return;
        }
    
    #if (NGX_HTTP_SSL)
    
        if (u->ssl && c->ssl == NULL) {
            ngx_http_upstream_ssl_init_connection(r, u, c);
            return;
        }
    
    #endif
    
        /* 已经接收到上游服务器的响应头部,则不需要再向上游服务器发送请求数据 */
        if (u->header_sent) {
            /* 将写事件的回调方法设置为不进行任何实际操作的方法ngx_http_upstream_dumy_handler */
            u->write_event_handler = ngx_http_upstream_dummy_handler;
    
            /* 将写事件注册到epoll事件机制中,并return从当前函数返回 */
            (void) ngx_handle_write_event(c->write, 0);
    
            return;
        }
    
        /* 若没有接收来自上游服务器的响应头部,则需向上游服务器发送请求数据 */
        ngx_http_upstream_send_request(r, u);
    }

    接收响应

    接收响应头部

    当 Nginx 已经向上游发送请求,准备开始接收来自上游的响应头部,由方法 ngx_http_upstream_process_header 实现,该方法接收并解析响应头部。

    ngx_http_upstream_process_header 方法的执行流程如下:

    • 检查上游连接上的读事件是否超时,若标志位 timedout 为 1,则表示超时,此时调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
    • 若标志位 timedout 为 0,接着检查 ngx_http_upstream_t 结构体中的标志位 request_sent,若该标志位为 0,表示未向上游发送请求,同时调用 ngx_http_upstream_test_connect 方法测试连接状态,若该方法返回值为非 NGX_OK,表示与上游已经断开连接,则调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
    • 检查 ngx_http_upstream_t 结构体中接收响应头部的 buffer 缓冲区是否有内存空间以便接收响应头部,若 buffer.start 为 NULL,表示该缓冲区为空,则需调用 ngx_palloc 方法分配内存,该内存大小 buffer_size 由 ngx_http_upstream_conf_t 配置结构体的 buffer_size 成员指定;
    • 调用 recv 方法开始接收来自上游服务器的响应头部,并根据该方法的返回值 n 进行判断:
      • 若 n = NGX_AGAIN,表示读事件未准备就绪,需要等待下次读事件被触发时继续接收响应头部,此时,调用 ngx_add_timer 方法将读事件添加到定时器中,同时调用 ngx_handle_read_event 方法将读事件注册到epoll 事件机制中,并 return 从当前函数返回;
      • 若 n = NGX_ERROR 或 n = 0,表示上游连接发生错误 或 上游服务器主动关闭连接,则调用 ngx_http_upstream_next 方法重新发起连接请求,并 return 从当前函数返回;
      • 若 n 大于 0,表示已经接收到响应头部,此时,调用 ngx_http_upstream_t 结构体中由 HTTP 模块实现的 process_header 方法解析响应头部,且返回 rc 值;
    • 若 rc = NGX_AGAIN,表示接收到的响应头部不完整,检查接收缓冲区 buffer 是否还有剩余的内存空间,若缓冲区没有剩余的内存空间,表示接收到的响应头部过大,此时调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;若缓冲区还有剩余的内存空间,则continue 继续接收响应头部;
    • 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的响应头部是非法的,则调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;
    • 若 rc = NGX_ERROR,表示连接出错,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
    • 若 rc = NGX_OK,表示已接收到完整的响应头部,则调用 ngx_http_upstream_process_headers 方法处理已解析的响应头部,该方法会将已解析出来的响应头部保存在 ngx_http_request_t 结构体中的 headers_out 成员;
    • 检查 ngx_http_request_t 结构体的 subrequest_in_memory 成员决定是否需要转发响应给下游服务器;
      • 若 subrequest_in_memory 为 0,表示需要转发响应给下游服务器,则调用 ngx_http_upstream_send_response 方法开始转发响应给下游服务器,并 return 从当前函数返回;
      • 若 subrequest_in_memory 为 1,表示不需要将响应转发给下游,此时检查 HTTP 模块是否定义了 ngx_http_upstream_t 结构体中的 input_filter 方法处理响应包体;
    • 若没有定义 input_filter 方法,则使用 upstream 机制默认方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
    • 若定义了自己的 input_filter 方法,则首先调用 input_filter_init 方法为处理响应包体做初始化工作;
    • 检查接收缓冲区 buffer 在解析完响应头部之后剩余的字符流,若有剩余的字符流,则表示已经预接收了响应包体,此时调用 input_filter 方法处理响应包体;
    • 设置 upstream 机制读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_body_in_memory,并调用该方法开始接收并解析响应包体;
    /* 接收并解析响应头部 */
    static void
    ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
    {
        ssize_t            n;
        ngx_int_t          rc;
        ngx_connection_t  *c;
    
        c = u->peer.connection;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http upstream process header");
    
        c->log->action = "reading response header from upstream";
    
        /* 检查当前连接上的读事件是否超时 */
        if (c->read->timedout) {
            /*
             * 若标志位timedout为1,表示读事件超时;
             * 则把超时错误传递给ngx_http_upstream_next方法,
             * 该方法根据允许的错误进行重连接策略;
             * 并return从当前函数返回;
             */
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
            return;
        }
    
        /*
         * 若标志位request_sent为0,表示还未发送请求;
         * 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
         * 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
         * 并return从当前函数返回;
         */
        if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }
    
        /*
         * 检查ngx_http_upstream_t结构体中接收响应头部的buffer缓冲区;
         * 若接收缓冲区buffer未分配内存,则调用ngx_palloce方法分配内存,
         * 该内存的大小buffer_size由ngx_http_upstream_conf_t配置结构的buffer_size指定;
         */
        if (u->buffer.start == NULL) {
            u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
            if (u->buffer.start == NULL) {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
            /* 调整接收缓冲区buffer,准备接收响应头部 */
            u->buffer.pos = u->buffer.start;
            u->buffer.last = u->buffer.start;
            u->buffer.end = u->buffer.start + u->conf->buffer_size;
            /* 表示该缓冲区内存可被复用、数据可被改变 */
            u->buffer.temporary = 1;
    
            u->buffer.tag = u->output.tag;
    
            /* 初始化headers_in的成员headers链表 */
            if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                              sizeof(ngx_table_elt_t))
                != NGX_OK)
            {
                ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
    
    #if (NGX_HTTP_CACHE)
    
            if (r->cache) {
                u->buffer.pos += r->cache->header_start;
                u->buffer.last = u->buffer.pos;
            }
    #endif
        }
    
        for ( ;; ) {
    
            /* 调用recv方法从当前连接上读取响应头部数据 */
            n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
    
            /* 下面根据 recv 方法不同返回值 n 进行判断 */
    
            /*
             * 若返回值 n = NGX_AGAIN,表示读事件未准备就绪,
             * 需等待下次读事件被触发时继续接收响应头部,
             * 即将读事件注册到epoll事件机制中,等待可读事件发生;
             * 并return从当前函数返回;
             */
            if (n == NGX_AGAIN) {
    #if 0
                ngx_add_timer(rev, u->read_timeout);
    #endif
    
                if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                    ngx_http_upstream_finalize_request(r, u,
                                                   NGX_HTTP_INTERNAL_SERVER_ERROR);
                    return;
                }
    
                return;
            }
    
            if (n == 0) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream prematurely closed connection");
            }
    
            /*
             * 若返回值 n = NGX_ERROR 或 n = 0,则表示上游服务器已经主动关闭连接;
             * 此时,调用ngx_http_upstream_next方法决定是否重新发起连接;
             * 并return从当前函数返回;
             */
            if (n == NGX_ERROR || n == 0) {
                ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
                return;
            }
    
            /* 若返回值 n 大于 0,表示已经接收到响应头部 */
            u->buffer.last += n;
    
    #if 0
            u->valid_header_in = 0;
    
            u->peer.cached = 0;
    #endif
    
            /*
             * 调用ngx_http_upstream_t结构体中process_header方法开始解析响应头部;
             * 并根据该方法返回值进行不同的判断;
             */
            rc = u->process_header(r);
    
            /*
             * 若返回值 rc = NGX_AGAIN,表示接收到的响应头部不完整,
             * 需等待下次读事件被触发时继续接收响应头部;
             * continue继续接收响应;
             */
            if (rc == NGX_AGAIN) {
    
                if (u->buffer.last == u->buffer.end) {
                    ngx_log_error(NGX_LOG_ERR, c->log, 0,
                                  "upstream sent too big header");
    
                    ngx_http_upstream_next(r, u,
                                           NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                    return;
                }
    
                continue;
            }
    
            break;
        }
    
        /*
         * 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
         * 则表示接收到的响应头部是非法的,
         * 调用ngx_http_upstream_next方法决定是否重新发起连接;
         * 并return从当前函数返回;
         */
        if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
            return;
        }
    
        /*
         * 若返回值 rc = NGX_ERROR,表示出错,
         * 则调用ngx_http_upstream_finalize_request方法结束该请求;
         * 并return从当前函数返回;
         */
        if (rc == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    
        /* rc == NGX_OK */
    
        /*
         * 若返回值 rc = NGX_OK,表示成功解析到完整的响应头部;*/
        if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {
    
            if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
                return;
            }
    
            if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
                return;
            }
        }
    
        /* 调用ngx_http_upstream_process_headers方法处理已解析处理的响应头部 */
        if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
            return;
        }
    
        /*
         * 检查ngx_http_request_t 结构体的subrequest_in_memory成员决定是否转发响应给下游服务器;
         * 若该标志位为0,则需调用ngx_http_upstream_send_response方法转发响应给下游服务器;
         * 并return从当前函数返回;
         */
        if (!r->subrequest_in_memory) {
            ngx_http_upstream_send_response(r, u);
            return;
        }
    
        /* 若不需要转发响应,则调用ngx_http_upstream_t中的input_filter方法处理响应包体 */
        /* subrequest content in memory */
    
        /*
         * 若HTTP模块没有定义ngx_http_upstream_t中的input_filter处理方法;
         * 则使用upstream机制默认方法ngx_http_upstream_non_buffered_filter;
         *
         * 若HTTP模块实现了input_filter方法,则不使用upstream默认的方法;
         */
        if (u->input_filter == NULL) {
            u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
            u->input_filter = ngx_http_upstream_non_buffered_filter;
            u->input_filter_ctx = r;
        }
    
        /*
         * 调用input_filter_init方法为处理包体做初始化工作;
         */
        if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
    
        /*
         * 检查接收缓冲区是否有剩余的响应数据;
         * 因为响应头部已经解析完毕,若接收缓冲区还有未被解析的剩余数据,
         * 则该数据就是响应包体;
         */
        n = u->buffer.last - u->buffer.pos;
    
        /*
         * 若接收缓冲区有剩余的响应包体,调用input_filter方法开始处理已接收到响应包体;
         */
        if (n) {
            u->buffer.last = u->buffer.pos;
    
            u->state->response_length += n;
    
            /* 调用input_filter方法处理响应包体 */
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }
        }
    
        if (u->length == 0) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    
        /* 设置upstream机制的读事件回调方法read_event_handler为ngx_http_upstream_process_body_in_memory */
        u->read_event_handler = ngx_http_upstream_process_body_in_memory;
    
        /* 调用ngx_http_upstream_process_body_in_memory方法开始处理响应包体 */
        ngx_http_upstream_process_body_in_memory(r, u);
    }

    接收响应包体

    接收并解析响应包体由 ngx_http_upstream_process_body_in_memory 方法实现;

    ngx_http_upstream_process_body_in_memory 方法的执行流程如下所示:

    • 检查上游连接上读事件是否超时,若标志位 timedout 为 1,则表示已经超时,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
    • 检查接收缓冲区 buffer 是否还有剩余的内存空间,若没有剩余的内存空间,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;若有剩余的内存空间则调用 recv 方法开始接收响应包体;
      • 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体,调用 ngx_handle_read_event 方法将读事件注册到 epoll 事件机制中,同时将读事件添加到定时器机制中;
      • 若返回值 n = 0 或 n = NGX_ERROR,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
      • 若返回值 n 大于 0,则表示成功接收到响应包体,调用 input_filter 方法开始处理响应包体,检查读事件的 ready 标志位;
    • 若标志位 ready 为 1,表示仍有可读的响应包体数据,因此回到步骤 2 继续调用 recv 方法读取响应包体,直到读取完毕;
    • 若标志位 ready 为 0,则调用 ngx_handle_read_event 方法将读事件注册到epoll事件机制中,同时调用 ngx_add_timer 方法将读事件添加到定时器机制中;
    /* 接收并解析响应包体 */
    static void
    ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
        ngx_http_upstream_t *u)
    {
        size_t             size;
        ssize_t            n;
        ngx_buf_t         *b;
        ngx_event_t       *rev;
        ngx_connection_t  *c;
    
        c = u->peer.connection;
        rev = c->read;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http upstream process body on memory");
    
        /*
         * 检查读事件标志位timedout是否超时,若该标志位为1,表示响应已经超时;
         * 则调用ngx_http_upstream_finalize_request方法结束请求;
         * 并return从当前函数返回;
         */
        if (rev->timedout) {
            ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
            return;
        }
    
        b = &u->buffer;
    
        for ( ;; ) {
    
            /* 检查当前接收缓冲区是否剩余的内存空间 */
            size = b->end - b->last;
    
            /*
             * 若接收缓冲区不存在空闲的内存空间,
             * 则调用ngx_http_upstream_finalize_request方法结束请求;
             * 并return从当前函数返回;
             */
            if (size == 0) {
                ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                              "upstream buffer is too small to read response");
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }
    
            /*
             * 若接收缓冲区有可用的内存空间,
             * 则调用recv方法开始接收响应包体;
             */
            n = c->recv(c, b->last, size);
    
            /*
             * 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体;
             */
            if (n == NGX_AGAIN) {
                break;
            }
    
            /*
             * 若返回值n = 0(表示上游服务器主动关闭连接),或n = NGX_ERROR(表示出错);
             * 则调用ngx_http_upstream_finalize_request方法结束请求;
             * 并return从当前函数返回;
             */
            if (n == 0 || n == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, n);
                return;
            }
    
            /* 若返回值 n 大于0,表示成功读取到响应包体 */
            u->state->response_length += n;
    
            /* 调用input_filter方法处理本次接收到的响应包体 */
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }
    
            /* 检查读事件的ready标志位,若为1,继续读取响应包体 */
            if (!rev->ready) {
                break;
            }
        }
    
        if (u->length == 0) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    
        /*
         * 若读事件的ready标志位为0,表示读事件未准备就绪,
         * 则将读事件注册到epoll事件机制中,添加到定时器机制中;
         * 读事件的回调方法不改变,即依旧为ngx_http_upstream_process_body_in_memory;
         */
        if (ngx_handle_read_event(rev, 0) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
    
        if (rev->active) {
            ngx_add_timer(rev, u->conf->read_timeout);
    
        } else if (rev->timer_set) {
            ngx_del_timer(rev);
        }
    }

    转发响应

    下面看下 upstream 处理上游响应包体的三种方式:

    1. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 1 时,upstream 不转发响应包体到下游,并由HTTP 模块实现的input_filter() 方法处理包体;
    2. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 1 时,upstream 将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
    3. 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 0 时,upstream 将使用固定大小的缓冲区来转发响应包体;

    转发响应由函数 ngx_http_upstream_send_response 实现,该函数的执行流程如下:

    • ngx_http_send_header 메소드를 호출하여 응답 헤더를 전달하고 ngx_http_upstream_t 구조의 header_sent 플래그를 1로 설정하여 응답 헤더가 전달되었음을 나타냅니다.
    • 임시 파일이 여전히 요청 패키지 본문을 저장하는 경우 ngx_pool_run_cleanup_filter 메소드를 호출하여 임시 파일을 정리해야 합니다.
    • 버퍼링 플래그가 1인 경우. 파일 캐시를 켜야 함을 의미합니다. 플래그 비트가 0이면 파일 캐싱을 활성화할 필요가 없으며 응답 패킷 본문만 고정된 메모리 블록 크기로 전달되어야 합니다. >
    • 버퍼링 플래그 비트가 0인 경우
      • HTTP 모듈이 자체 input_filter 메서드를 구현하는지 확인하세요. 그렇지 않은 경우 업스트림 메커니즘의 기본 메서드 ngx_http_upstream_non_buffered_filter를 사용하세요. 🎜>
      • NGX_HTTP_UPSTREAM_T 구조를 이벤트의 콜백 메소드를 읽으십시오. read_event_handler는 ngx_http_upstream_process_non_buffered_upstream _prostream _proped _propsecess를받을 때.
      • 이벤트를 작성하기 위해 ngx_http_upstream_twrite 구조를 설정합니다. _event_handler의 콜백 메소드는 ngx_http_upstream_process_non_buffered_downstream이며, 데이터가 다운스트림으로 전송되면 ngx_http_upstream_process_non_buffered_downstream 메소드가 최종적으로 ngx_http_handler 메소드를 통해 호출되어 응답 패킷 본문을 보냅니다.
      • input_filter_init 메소드가 호출되어 input_filter 메소드를 초기화하여 응답 패킷 본문을 처리합니다.
      • 수신 버퍼 버퍼를 확인하여 남은 응답이 있는지 확인합니다. 응답 헤더를 파싱한 후 데이터가 있으면 응답 패킷 본문을 미리 수신했다는 의미입니다.
      • 파싱 중에 응답 패킷 본문을 미리 수신한 경우입니다. 응답 헤더 부분에서는 input_filter 메소드를 호출하여 미리 수신한 응답 패킷 본문 중 이 부분을 처리하고, ngx_http_upstream_process_non_buffered_downstream 메소드를 호출하여 이번에 수신한 응답 패킷 본문을 다운스트림 서버로 전달합니다.
        • 응답 헤더 부분을 파싱하는 동안 응답 패킷 본문이 수신되지 않으면 응답 패킷 본문을 수신하고 업스트림 연결 읽기 이벤트가 준비되었는지 확인하는 데 재사용할 수 있도록 수신 버퍼를 먼저 비웁니다. . 플래그 준비가 1이면 준비되었음을 나타냅니다. ngx_http_upstream_process_non_buffered_upstream 메서드를 호출하여 업스트림 응답 패킷 본문을 수신합니다. 플래그 준비가 0이면 현재 함수에서 반환됩니다. 🎜>
        • 버퍼링 플래그가 1인 경우
      • ngx_http_upstream_t 구조에서 ngx_event_pipe_t 파이프 멤버를 초기화합니다. 🎜>
      input_filter_init 메소드를 호출하여 input_filter 메소드에 의해 처리된 응답 패킷 본문을 초기화합니다.
    • 업스트림 연결에서 read_event_handler의 콜백 메소드를 ngx_http_upstream_process_upstream으로 설정합니다. 🎜>
      • 업스트림 연결에서 쓰기 이벤트 write_event_handler의 콜백 메소드를 ngx_http_upstream_process_downstream으로 설정합니다.
      • 보내진 응답 패킷 본문을 처리하기 위해 ngx_http_upstream_proess_upstream 메소드를 호출합니다. 업스트림 서버에 의해
      • /* 전달 응답 패킷 본문*/ 정적 공백 ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { int tcp_nodelay; ssize_tn; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; /* ngx_http_send_hander 메소드를 호출하여 응답 헤더를 다운스트림으로 보냅니다. */ rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); 반품; } /* header_sent 플래그를 1로 설정 */ u->header_sent = 1; if (u->업그레이드) { ngx_http_upstream_upgrade(r, u); 반품; } /* Nginx와 다운스트림 간의 TCP 연결 가져오기 */ c = r->연결; if (r->header_only) { if (u->캐시 가능 || u->store) { if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) { ngx_connection_error(c, ngx_socket_errno, ngx_shutdown_socket_n "실패"); } r->read_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler; c->오류 = 1; } 또 다른 { ngx_http_upstream_finalize_request(r, u, rc); 반품; } } /* 임시 파일에 요청 패키지 본문이 저장되어 있는 경우 ngx_pool_run_cleanup_file 메서드를 호출하여 임시 파일의 요청 패키지 본문을 정리합니다. */ if (r->request_body && r->request_body->temp_file) { ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; }clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* * 버퍼링 플래그가 0인 경우 응답을 전달할 때 다운스트림 서버의 네트워크 속도에 우선순위가 부여됩니다. * 즉, 업스트림 서버로부터 응답을 받아 전달하기 위해 고정된 메모리 블록 크기를 할당하면 되며, * 메모리 블록이 가득 차면 업스트림 서버로부터의 응답 데이터 수신이 일시 중단됩니다. * 응답을 계속 수신하기 전에 메모리 블록의 응답 데이터를 다운스트림 서버로 전달한 후 남은 메모리 공간이 있을 때까지 기다리십시오. */ if (!u->버퍼링) { /* * HTTP 모듈이 input_filter 메소드를 구현하지 않는 경우, * 업스트림 메커니즘 ngx_http_upstream_non_buffered_filter의 기본 방법이 사용됩니다. */ if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } /* * ngx_http_upstream_t 구조체에 있는 읽기 이벤트의 콜백 메소드를 ngx_http_upstream_non_buffered_upstream(즉, 업스트림 응답을 읽는 메소드)으로 설정합니다. * 현재 요청 ngx_http_request_t 구조의 쓰기 이벤트 콜백 메소드를 ngx_http_upstream_process_non_buffered_downstream으로 설정합니다(즉, 응답을 다운스트림으로 전달하는 메소드). */ u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; /* input_filter_init를 호출하여 input_filter 메소드에 대한 응답 패키지 본문을 초기화합니다 */ if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) 실패"); ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } c->tcp_nodelay = NGX_TCP_NODELAY_SET; } /* 응답 헤더를 파싱한 후 수신 버퍼 버퍼가 응답 본문을 수신했는지 확인 */ n = u->buffer.last - u->buffer.pos; /* 수신 버퍼가 응답 패킷 본문을 수신한 경우 */ 만약 (n) { u->buffer.last = u->buffer.pos; u->상태->응답_길이 += n; /* input_filter 메소드를 호출하여 응답 본문 처리를 시작합니다 */ if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } /* 이번에 수신한 응답 패킷 본문을 다운스트림 서버로 전달하려면 이 메서드를 호출합니다. */ ngx_http_upstream_process_non_buffered_downstream(r); } 또 다른 { /* 수신 버퍼에 응답 패킷 본문이 없으면 이를 지웁니다. 즉, 이 버퍼를 재사용합니다. */ u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } /* * 현재 연결에 대한 읽기 이벤트가 준비된 경우 * 그런 다음 ngx_http_upstream_process_non_buffered_upstream 메소드를 호출하여 응답 패킷 본문을 수신하고 처리합니다. */ if (u->peer.connection->read->ready || u->length == 0) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } 반품; }/* * ngx_http_upstream_t 구조의 버퍼링 플래그가 1인 경우 응답 패킷 본문을 전달할 때 업스트림 네트워크 속도에 우선순위가 부여됩니다. * 즉, 더 많은 메모리와 캐시를 할당합니다. 즉, 항상 업스트림 서버로부터 응답을 받고, 업스트림 서버로부터의 응답을 메모리나 캐시에 저장합니다. */ /* TODO: event_pipe buf를 사전 할당하고 "Content-Length"를 확인하세요 */ #if (NGX_HTTP_CACHE) ... ... #endif /* ngx_event_pipe_t 구조체 p 초기화 ​​*/ p = u->파이프; p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->태그 = u->output.tag; p->bufs = u->conf->bufs; p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->다운스트림 = c; p->풀 = r->풀; p->log = c->log; p->캐시 가능 = u->캐시 가능 || p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file.log = c->log; p->temp_file->path = u->conf->temp_path; p->temp_file->pool = r->pool; if (p->캐시 가능) { p->temp_file->지속성 = 1; } 또 다른 { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "업스트림 응답이 버퍼링되었습니다." "임시 파일로"; } p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; /* 미리 읽기 연결 목록 버퍼 preread_bufs 초기화 */ p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } p->preread_bufs->buf = &u->버퍼; p->preread_bufs->next = NULL; u->buffer.recycled = 1; p->preread_size = u->buffer.last - u->buffer.pos; if (u->캐시 가능) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 반품; } p->buf_to_file->start = u->buffer.start; p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->임시 = 1; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { /* 게시된 aio 작업으로 인해 섀도우 버퍼가 손상될 수 있습니다 */ p->single_buf = 1; } /* TODO: p->free_bufs = ngx_create_chain_of_bufs()를 사용하는 경우 0 */ p->free_bufs = 1; /* * event_pipe는 u->b를 수행합니다.
성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.