>백엔드 개발 >Golang >두 개의 개별 속도 제한 엔드포인트 간의 요청을 동기화합니다.

두 개의 개별 속도 제한 엔드포인트 간의 요청을 동기화합니다.

PHPz
PHPz앞으로
2024-02-11 10:09:08486검색

두 개의 개별 속도 제한 엔드포인트 간의 요청을 동기화합니다.

웹 개발에서 우리는 두 개의 개별 속도 제한 엔드포인트 간에 동기 요청을 수행해야 하는 상황에 자주 직면합니다. 이 시점에서 우리는 요청이 적절한 시간 내에 전송되고 속도 제한에 도달할 때까지 기다리는 방법을 찾아야 합니다. 이 기사에서 Apple의 PHP 편집기는 동기식 요청 기능을 구현하고 데이터 정확성과 안정성을 보장하는 데 도움이 되는 솔루션을 소개합니다. 이 솔루션의 구체적인 구현을 살펴보겠습니다!

질문 내용

저는 일부 타사 API를 사용하고 있으며 각 API에는 자체 속도 제한이 있습니다. 엔드포인트 1의 속도 제한은 10/s이고 엔드포인트 2의 속도 제한은 20/s입니다.

객체 배열(2~3000개 객체)을 반환하는 엔드포인트 1을 통해 데이터를 처리해야 합니다. 그런 다음 각 개체를 가져와 두 번째 끝점의 속도 제한을 준수하면서 일부 데이터를 두 번째 끝점으로 보내야 합니다.

go 루틴에서 한 번에 10개의 요청을 일괄 전송하여 10개의 요청이 모두

궁극적으로는 각 엔드포인트가 한 번에 보내는 동시 응답 수를 제한할 수 있었으면 좋겠습니다. 특히 서버로부터의 500개 이상의 응답으로 인해 실패한 요청을 다시 시도해야 하는 경우에는 더욱 그렇습니다.

질문의 목적에 따라 httpbin 요청을 사용하여 다음 시나리오를 시뮬레이션하고 있습니다.

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "sync"
    "time"
)

type HttpBinGetRequest struct {
    url string
}

type HttpBinGetResponse struct {
    Uuid       string `json:"uuid"`
    StatusCode int
}

type HttpBinPostRequest struct {
    url  string
    uuid string // Item to post to API
}

type HttpBinPostResponse struct {
    Data       string `json:"data"`
    StatusCode int
}

func main() {

    // Prepare GET requests for 500 requests
    var requests []*HttpBinGetRequest
    for i := 0; i < 500; i++ {
        uri := "https://httpbin.org/uuid"
        request := &HttpBinGetRequest{
            url: uri,
        }
        requests = append(requests, request)
    }

    // Create semaphore and rate limit for the GET endpoint
    getSemaphore := make(chan struct{}, 10)
    getRate := make(chan struct{}, 10)
    for i := 0; i < cap(getRate); i++ {
        getRate <- struct{}{}
    }

    go func() {
        // ticker corresponding to 1/10th of a second
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        for range ticker.C {
            _, ok := <-getRate
            if !ok {
                return
            }
        }
    }()

    // Send our GET requests to obtain a random UUID
    var wg sync.WaitGroup
    for _, request := range requests {
        wg.Add(1)
        // Go func to make request and receive the response
        go func(r *HttpBinGetRequest) {
            defer wg.Done()

            // Check the rate limiter and block if it is empty
            getRate <- struct{}{}

            // Add a token to the semaphore
            getSemaphore <- struct{}{}

            // Remove token when function is complete
            defer func() {
                <-getSemaphore
            }()
            resp, _ := get(r)
            fmt.Printf("%+v\n", resp)
        }(request)
    }
    wg.Wait()

    // I need to add code that obtains the response data from the above for loop
    // then sends the UUID it to its own go routines for a POST request, following a similar pattern above
    // To not violate the rate limit of the second endpoint which is 20 calls per second
    // postSemaphore := make(chan struct{}, 20)
    // postRate := make(chan struct{}, 20)
    // for i := 0; i < cap(postRate); i++ {
    //  postRate <- struct{}{}
    // }
}

func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {

    httpResp := &HttpBinGetResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("GET", hbgr.url, nil)
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err
    }

    req.Header = http.Header{
        "accept": {"application/json"},
    }

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println(err)
        fmt.Println("error getting response")
        return httpResp, err
    }

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    }
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    return httpResp, nil
}

// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {

    httpResp := &HttpBinPostResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err
    }

    req.Header = http.Header{
        "accept": {"application/json"},
    }

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("error getting response")
        return httpResp, err
    }

    if resp.StatusCode == 429 {
        fmt.Println(resp.Header.Get("Retry-After"))
    }

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    }
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    fmt.Printf("%+v", httpResp)
    return httpResp, nil
}

Solution

이것은 생산자/소비자 패턴입니다. chan을 사용하여 연결할 수 있습니다.

속도 제한의 경우 패키지 golang.org/x/time/rate를 사용합니다.

생산자와 소비자를 연결하기 위해 chan을 사용하기로 결정했기 때문에 소비자가 다시 시도할 수 있도록 실패한 작업을 동일한 chan에 보내는 것이 당연합니다.

논리를 scheduler[t] 유형으로 캡슐화했습니다. 아래 데모를 참조하세요. 이 데모는 급하게 작성되었으며 아이디어를 설명하기 위한 목적으로만 작성되었습니다. 철저하게 테스트되지 않았습니다.

으아악

출력은 다음과 같습니다:

으아악

위 내용은 두 개의 개별 속도 제한 엔드포인트 간의 요청을 동기화합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 stackoverflow.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제