首頁  >  文章  >  後端開發  >  控制傳出速率限制

控制傳出速率限制

WBOY
WBOY原創
2024-07-29 08:03:13886瀏覽

讓我們想像這樣一個場景:有一個與第三方 API 互動的分散式應用程式。通常,第三方 API 具有速率限制控制機制,以避免其客戶端突發請求並導致服務停機。在這樣的場景下,呼叫者如何在分散式環境中控制向第三方API發出請求的速率?這篇文章討論了解決這個問題的可能策略。

有多種演算法可以控制請求速率,但這裡我們將重點放在令牌桶演算法,因為它相對容易理解和實現。演算法規定:一個桶子最多可以容納T個令牌,當應用程式想要向第三方API發出請求時,它必須採取1桶中的令牌。如果桶子是空的,則必須等到桶中至少有 1 個令牌。此外,儲存桶會以 R 令牌/毫秒的固定速率重新填充 1 令牌。

令牌桶演算法非常容易理解,但是如何在分散式環境中使用它來控制對第三方 API 的傳出請求?

如果想要在分散式環境中控制傳出速率限制,則需要當前速率限制的集中事實來源。有多種方法可以實現事實來源,我用可能的實現理想化了下圖:

Controlling outgoing rate limit

在上圖中,我們有一個分佈在多個pod中的應用程序,每個pod都可以向第三方API發出請求。在應用基礎設施中,有一個TCP伺服器,它透過令牌桶演算法來控制速率限制。在向第三方 API 發出請求之前,pod 會向 TCP 伺服器請求新的令牌,並且 pod 會等待 TCP 伺服器的回應,直到至少有一個可用令牌。令牌可用後,Pod 會向第三方 API 發出請求。

TCP 伺服器實作可以在這個儲存庫 https://github.com/rafaquelhodev/rlimit/ 中找到,在下一節中我將簡要討論 golang 中的令牌桶實作。

令牌桶實現

下面,我將展示令牌桶實現背後的主要思想。請查看 https://github.com/rafaquelhodev/rlimit/ 儲存庫以了解詳細的實作。

速率限制控制集中在TokenBucket結構體:

type TokenBucket struct {
    id           string
    mu           sync.Mutex
    tokens       int64
    maxTokens    int64
    refillPeriod int64
    cron         chan bool
    subs         []chan bool
}

您可以注意到 TokenBucket 結構中有一個 subs 屬性。基本上,這是特定令牌桶的訂閱者陣列:每次用戶端請求令牌時,用戶端都會新增到 subs 陣列中,並且當新令牌新增到儲存桶時客戶端會收到通知。

啟動桶時,我們需要提供桶可以支援的最大令牌數(maxTokens)以及令牌添加到桶中的時間量(refillPeriod):

func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
    bucket := &TokenBucket{
        id:           id,
        tokens:       0,
        maxTokens:    maxTokens,
        refillPeriod: refillPeriod,
        cron:         make(chan bool),
        subs:         make([]chan bool, 0),
    }
    fmt.Printf("refill period  = %d\n", refillPeriod)
    bucket.startCron()
    return bucket
}

現在,您可能想知道「如何將令牌添加到儲存桶中?」。為此,當建立儲存桶時,會啟動一個 cron 作業,並在每個 refillPeriod 毫秒時,將一個新令牌新增至儲存桶:

func (tb *TokenBucket) startCron() {
    ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)

    go func() {
        for {
            select {
            case <-tb.cron:
                ticker.Stop()
                return
            case <-ticker.C:
                if tb.tokens < tb.maxTokens {
                    tb.tokens += 1
                    fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens)

                    if len(tb.subs) > 0 {
                        sub := tb.subs[0]
                        tb.subs = tb.subs[1:]
                        sub <- true
                    }
                }
            }
        }
    }()
}

最後,當客戶端想要從儲存桶中取得令牌時,必須呼叫 waitAvailable 函數:

func (tb *TokenBucket) waitAvailable() bool {
    tb.mu.Lock()

    if tb.tokens > 0 {
        fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
        tb.tokens -= 1
        tb.mu.Unlock()
        return true
    }

    fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)

    ch := tb.tokenSubscribe()

    tb.mu.Unlock()

    <-ch

    fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id)

    tb.tokens -= 1

    return true
}

靈感來自 https://github.com/Mohamed-khattab/Token-bucket-rate-limiter

以上是控制傳出速率限制的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn