我正在使用一些第三方 api,每个 api 都有自己的速率限制。端点1的速率限制为10/s,端点2的速率限制为20/s。

我需要通过端点 1 处理数据,该端点将返回一个对象数组(2-3000 个对象之间)。然后,我需要获取每个对象并将一些数据发送到第二个端点,同时遵守第二个端点的速率限制。

我计划在 go 例程中一次批量发送 10 个请求,确保如果所有 10 个请求都在

最终,我希望能够限制每个端点一次发出的并发响应数量。特别是如果我必须针对由于服务器 500 多个响应等原因导致的失败请求进行重试。

出于问题的目的,我使用 httpbin 请求来模拟以下场景:

package main

import (

type HttpBinGetRequest struct {
    url string

type HttpBinGetResponse struct {
    Uuid       string `json:"uuid"`
    StatusCode int

type HttpBinPostRequest struct {
    url  string
    uuid string // Item to post to API

type HttpBinPostResponse struct {
    Data       string `json:"data"`
    StatusCode int

func main() {

    // Prepare GET requests for 500 requests
    var requests []*HttpBinGetRequest
    for i := 0; i < 500; i++ {
        uri := "https://httpbin.org/uuid"
        request := &HttpBinGetRequest{
            url: uri,
        requests = append(requests, request)

    // Create semaphore and rate limit for the GET endpoint
    getSemaphore := make(chan struct{}, 10)
    getRate := make(chan struct{}, 10)
    for i := 0; i < cap(getRate); i++ {
        getRate <- struct{}{}

    go func() {
        // ticker corresponding to 1/10th of a second
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        for range ticker.C {
            _, ok := <-getRate
            if !ok {

    // Send our GET requests to obtain a random UUID
    var wg sync.WaitGroup
    for _, request := range requests {
        // Go func to make request and receive the response
        go func(r *HttpBinGetRequest) {
            defer wg.Done()

            // Check the rate limiter and block if it is empty
            getRate <- struct{}{}

            // Add a token to the semaphore
            getSemaphore <- struct{}{}

            // Remove token when function is complete
            defer func() {
            resp, _ := get(r)
            fmt.Printf("%+v\n", resp)

    // I need to add code that obtains the response data from the above for loop
    // then sends the UUID it to its own go routines for a POST request, following a similar pattern above
    // To not violate the rate limit of the second endpoint which is 20 calls per second
    // postSemaphore := make(chan struct{}, 20)
    // postRate := make(chan struct{}, 20)
    // for i := 0; i < cap(postRate); i++ {
    //  postRate <- struct{}{}
    // }

func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {

    httpResp := &HttpBinGetResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("GET", hbgr.url, nil)
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err

    req.Header = http.Header{
        "accept": {"application/json"},

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("error getting response")
        return httpResp, err

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    return httpResp, nil

// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {

    httpResp := &HttpBinPostResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err

    req.Header = http.Header{
        "accept": {"application/json"},

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("error getting response")
        return httpResp, err

    if resp.StatusCode == 429 {

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    fmt.Printf("%+v", httpResp)
    return httpResp, nil


这是生产者/消费者模式。您可以使用 chan 来连接它们。

关于速率限制器,我会使用包 golang.org/x/time/rate


我已将逻辑封装到 scheduler[t] 类型中。请参阅下面的演示。请注意,该演示是匆忙编写的,仅用于说明想法。尚未经过彻底测试。

package main

import (


type task[t any] struct {
    param       t
    failedcount int

type scheduler[t any] struct {
    name     string
    limit    int
    maxtries int
    wg       sync.waitgroup
    tasks    chan task[t]
    action   func(param t) error

// newscheduler creates a scheduler that runs the action with the specified rate limit.
// it will retry the action if the action returns a non-nil error.
func newscheduler[t any](name string, limit, maxtries, chansize int, action func(param t) error) *scheduler[t] {
    return &scheduler[t]{
        name:     name,
        limit:    limit,
        maxtries: maxtries,
        tasks:    make(chan task[t], chansize),
        action:   action,

func (s *scheduler[t]) addtask(param t) {
    s.tasks <- task[t]{param: param}

func (s *scheduler[t]) retrylater(t task[t]) {
    s.tasks <- t

func (s *scheduler[t]) run() {
    lim := rate.newlimiter(rate.limit(s.limit), 1)
    for t := range s.tasks {
        t := t
        if err := lim.wait(context.background()); err != nil {
            log.fatalf("wait: %s", err)
        go func() {
            defer s.wg.done()
            err := s.action(t.param)
            if err != nil {
                log.printf("task %s, param %v failed: %v", s.name, t.param, err)

                if t.failedcount == s.maxtries {
                    log.printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxtries)


func (s *scheduler[t]) wait() {

func main() {
    s := &server{}
    ts := httptest.newserver(s)
    defer ts.close()

    schedulerpost := newscheduler("post", 20, 3, 1, func(param string) error {
        return post(fmt.sprintf("%s/%s", ts.url, param))

    go schedulerpost.run()

    schedulerget := newscheduler("get", 10, 3, 1, func(param int) error {
        id, err := get(fmt.sprintf("%s/%d", ts.url, param))
        if err != nil {
            return err

        return nil

    go schedulerget.run()

    for i := 0; i < 100; i++ {



func get(url string) (string, error) {
    resp, err := http.get(url)
    if err != nil {
        return "", err
    defer resp.body.close()

    if resp.statuscode != 200 {
        return "", fmt.errorf("unexpected status code: %d", resp.statuscode)

    body, err := io.readall(resp.body)
    if err != nil {
        return "", err

    return string(body), nil

func post(url string) error {
    resp, err := http.post(url, "", nil)
    if err != nil {
        return err
    defer resp.body.close()

    if resp.statuscode != 200 {
        return fmt.errorf("unexpected status code: %d", resp.statuscode)

    return nil

type server struct {
    gmu  sync.mutex
    gets []int64

    pmu   sync.mutex
    posts []int64

func (s *server) servehttp(w http.responsewriter, r *http.request) {
    log.printf("%s: %s", r.method, r.url.path)

    // collect request stats.
    if r.method == http.methodget {
        s.gets = append(s.gets, time.now().unixmilli())
    } else {
        s.posts = append(s.posts, time.now().unixmilli())

    n := rand.intn(1000)
    // simulate latency.
    time.sleep(time.duration(n) * time.millisecond)

    // simulate errors.
    if n%10 == 0 {

    if r.method == http.methodget {
        fmt.fprintf(w, "%s", r.url.path[1:])

func (s *server) printstats() {
    log.printf("gets (total: %d):\n", len(s.gets))
    log.printf("posts (total: %d):\n", len(s.posts))

func printstats(ts []int64) {
    sort.slice(ts, func(i, j int) bool {
        return ts[i] < ts[j]

    count := 0
    to := ts[0] + 1000
    for i := 0; i < len(ts); i++ {
        if ts[i] < to {
        } else {
            fmt.printf("  %d: %d\n", to, count)
            i-- // push back the current item
            count = 0
            to += 1000
    if count > 0 {
        fmt.printf("  %d: %d\n", to, count)


2023/03/25 21:03:30 GETS (total: 112):
  1679749398998: 10
  1679749399998: 10
  1679749400998: 10
  1679749401998: 10
  1679749402998: 10
  1679749403998: 10
  1679749404998: 10
  1679749405998: 10
  1679749406998: 10
  1679749407998: 10
  1679749408998: 10
  1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
  1679749399079: 8
  1679749400079: 8
  1679749401079: 12
  1679749402079: 8
  1679749403079: 10
  1679749404079: 9
  1679749405079: 9
  1679749406079: 8
  1679749407079: 14
  1679749408079: 12
  1679749409079: 9
  1679749410079: 4

