讓我們想像這樣一個場景:有一個與第三方 API 互動的分散式應用程式。通常,第三方 API 具有速率限制控制機制,以避免其客戶端突發請求並導致服務停機。在這樣的場景下,呼叫者如何在分散式環境中控制向第三方API發出請求的速率?這篇文章討論了解決這個問題的可能策略。
有多種演算法可以控制請求速率,但這裡我們將重點放在令牌桶演算法,因為它相對容易理解和實現。演算法規定:一個桶子最多可以容納T個令牌,當應用程式想要向第三方API發出請求時,它必須採取1桶中的令牌。如果桶子是空的,則必須等到桶中至少有 1 個令牌。此外,儲存桶會以 R 令牌/毫秒的固定速率重新填充 1 令牌。
令牌桶演算法非常容易理解,但是如何在分散式環境中使用它來控制對第三方 API 的傳出請求?
如果想要在分散式環境中控制傳出速率限制,則需要當前速率限制的集中事實來源。有多種方法可以實現事實來源,我用可能的實現理想化了下圖:
在上圖中,我們有一個分佈在多個pod中的應用程序,每個pod都可以向第三方API發出請求。在應用基礎設施中,有一個TCP伺服器,它透過令牌桶演算法來控制速率限制。在向第三方 API 發出請求之前,pod 會向 TCP 伺服器請求新的令牌,並且 pod 會等待 TCP 伺服器的回應,直到至少有一個可用令牌。令牌可用後,Pod 會向第三方 API 發出請求。
TCP 伺服器實作可以在這個儲存庫 https://github.com/rafaquelhodev/rlimit/ 中找到,在下一節中我將簡要討論 golang 中的令牌桶實作。
下面,我將展示令牌桶實現背後的主要思想。請查看 https://github.com/rafaquelhodev/rlimit/ 儲存庫以了解詳細的實作。
速率限制控制集中在TokenBucket結構體:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
您可以注意到 TokenBucket 結構中有一個 subs 屬性。基本上,這是特定令牌桶的訂閱者陣列:每次用戶端請求令牌時,用戶端都會新增到 subs 陣列中,並且當新令牌新增到儲存桶時客戶端會收到通知。
啟動桶時,我們需要提供桶可以支援的最大令牌數(maxTokens)以及令牌添加到桶中的時間量(refillPeriod):
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
現在,您可能想知道「如何將令牌添加到儲存桶中?」。為此,當建立儲存桶時,會啟動一個 cron 作業,並在每個 refillPeriod 毫秒時,將一個新令牌新增至儲存桶:
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case <-tb.cron: ticker.Stop() return case <-ticker.C: if tb.tokens < tb.maxTokens { tb.tokens += 1 fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens) if len(tb.subs) > 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub <- true } } } } }() }
最後,當客戶端想要從儲存桶中取得令牌時,必須呼叫 waitAvailable 函數:
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock() <-ch fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id) tb.tokens -= 1 return true }
靈感來自 https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
以上是控制傳出速率限制的詳細內容。更多資訊請關注PHP中文網其他相關文章!