首页  >  文章  >  后端开发  >  控制传出速率限制

控制传出速率限制

WBOY
WBOY原创
2024-07-29 08:03:13833浏览

让我们想象这样一个场景:有一个与第三方 API 交互的分布式应用程序。通常,第三方 API 具有速率限制控制机制,以避免其客户端突发请求并导致服务停机。在这样的场景下,调用者如何在分布式环境中控制向第三方API发出请求的速率?这篇文章讨论了解决这个问题的可能策略。

有多种算法可以控制请求速率,但这里我们将重点关注令牌桶算法,因为它相对容易理解和实现。该算法规定:一个桶最多可以容纳T个令牌,当应用程序想要向第三方API发出请求时,它必须采取1桶中的令牌。如果桶是空的,则必须等到桶中至少有 1 个令牌。此外,存储桶会以 R 令牌/毫秒的固定速率重新填充 1 令牌。

令牌桶算法非常容易理解,但是如何在分布式环境中使用它来控制对第三方 API 的传出请求?

如果想要在分布式环境中控制传出速率限制,则需要当前速率限制的集中事实来源。有多种方法可以实现事实来源,我用可能的实现理想化了下图:

Controlling outgoing rate limit

在上图中,我们有一个分布在多个pod中的应用程序,每个pod都可以向第三方API发出请求。在应用基础设施中,有一个TCP服务器,它通过令牌桶算法来控制速率限制。在向第三方 API 发出请求之前,pod 会向 TCP 服务器请求新的令牌,并且 pod 会等待 TCP 服务器的响应,直到至少有一个可用令牌。令牌可用后,Pod 向第三方 API 发出请求。

TCP 服务器实现可以在这个存储库 https://github.com/rafaquelhodev/rlimit/ 中找到,在下一节中我将简要讨论 golang 中的令牌桶实现。

令牌桶实现

下面,我将展示令牌桶实现背后的主要思想。请查看 https://github.com/rafaquelhodev/rlimit/ 存储库以了解详细的实现。

速率限制控制集中在TokenBucket结构体中:

type TokenBucket struct {
    id           string
    mu           sync.Mutex
    tokens       int64
    maxTokens    int64
    refillPeriod int64
    cron         chan bool
    subs         []chan bool
}

您可以注意到 TokenBucket 结构中有一个 subs 属性。基本上,这是特定令牌桶的订阅者数组:每次客户端请求令牌时,客户端都会添加到 subs 数组中,并且当新令牌添加到存储桶时客户端会收到通知。

启动桶时,我们需要提供桶可以支持的最大令牌数(maxTokens)以及令牌添加到桶中的时间量(refillPeriod):

func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
    bucket := &TokenBucket{
        id:           id,
        tokens:       0,
        maxTokens:    maxTokens,
        refillPeriod: refillPeriod,
        cron:         make(chan bool),
        subs:         make([]chan bool, 0),
    }
    fmt.Printf("refill period  = %d\n", refillPeriod)
    bucket.startCron()
    return bucket
}

现在,您可能想知道“如何将令牌添加到存储桶中?”。为此,当创建存储桶时,会启动一个 cron 作业,并在每个 refillPeriod 毫秒时,将一个新令牌添加到存储桶中:

func (tb *TokenBucket) startCron() {
    ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)

    go func() {
        for {
            select {
            case <-tb.cron:
                ticker.Stop()
                return
            case <-ticker.C:
                if tb.tokens < tb.maxTokens {
                    tb.tokens += 1
                    fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens)

                    if len(tb.subs) > 0 {
                        sub := tb.subs[0]
                        tb.subs = tb.subs[1:]
                        sub <- true
                    }
                }
            }
        }
    }()
}

最后,当客户端想要从存储桶中获取令牌时,必须调用 waitAvailable 函数:

func (tb *TokenBucket) waitAvailable() bool {
    tb.mu.Lock()

    if tb.tokens > 0 {
        fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
        tb.tokens -= 1
        tb.mu.Unlock()
        return true
    }

    fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)

    ch := tb.tokenSubscribe()

    tb.mu.Unlock()

    <-ch

    fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id)

    tb.tokens -= 1

    return true
}

灵感来自 https://github.com/Mohamed-khattab/Token-bucket-rate-limiter

以上是控制传出速率限制的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn