Maison >développement back-end >Golang >Contrôler la limite de débit sortant
Imaginons un scénario dans lequel on dispose d'une application distribuée qui interagit avec une API tierce. Habituellement, les API tierces disposent d'un mécanisme de contrôle de limite de débit afin d'éviter à leurs clients d'éclater les requêtes et de provoquer des temps d'arrêt de leurs services. Dans un tel scénario, comment l'appelant peut-il contrôler le taux de requêtes sortantes vers l'API tierce dans un environnement distribué ? Cet article discute d'une stratégie possible pour ce problème.
Il existe plusieurs algorithmes pour contrôler le taux de requêtes, mais ici nous nous concentrerons sur l'algorithme du token bucket, car il est relativement facile à comprendre et à mettre en œuvre. Cet algorithme stipule que : un bucket peut contenir un maximum de T tokens, et lorsqu'une application souhaite faire une requête à l'API tierce, elle doit en prendre 1 jeton du seau. Si le seau est vide, il doit attendre qu'il y ait au moins 1 jeton dans le seau. De plus, le seau est rempli de 1 jeton à un taux fixe de R jetons/millisecondes.
L'algorithme du token bucket est très simple à comprendre, mais comment quelqu'un peut-il l'utiliser dans un environnement distribué pour contrôler la requête sortante vers des API tierces ?
Si l'on souhaite contrôler la limite de débit sortant dans un environnement distribué, une source centralisée de vérité pour la limite de débit actuelle est nécessaire. Il existe plusieurs façons d'implémenter la source de vérité et j'ai idéalisé le schéma suivant avec une implémentation possible :
Dans la figure ci-dessus, nous avons une application distribuée dans plusieurs pods, et chaque pod peut adresser des requêtes à une API tierce. Dans l'infrastructure de l'application, il existe un serveur TCP qui contrôle la limite de débit à l'aide de l'algorithme du compartiment à jetons. Avant de faire une requête à l'API tierce, le pod demande un nouveau jeton au serveur TCP et le pod attend une réponse du serveur TCP jusqu'à ce qu'il y ait au moins un jeton disponible. Une fois qu'un jeton est disponible, le pod envoie la demande à l'API tierce.
L'implémentation du serveur TCP peut être trouvée dans ce référentiel https://github.com/rafaquelhodev/rlimit/ et dans la section suivante, je discuterai brièvement de l'implémentation du bucket de jetons dans Golang.
Ci-dessous, je montre les principales idées derrière la mise en œuvre du token bucket. Veuillez jeter un œil au référentiel https://github.com/rafaquelhodev/rlimit/ pour comprendre la mise en œuvre détaillée.
Le contrôle de la limite de débit est centralisé dans la structure TokenBucket :
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
Vous pouvez remarquer qu'il existe une propriété subs dans la structure TokenBucket. Fondamentalement, il s'agit d'un tableau d'abonnés pour un compartiment de jetons spécifique : chaque fois qu'un jeton est demandé à un client, le client est ajouté au tableau subs et le client est averti lorsqu'un nouveau jeton est ajouté au compartiment.
Lors du démarrage du bucket, nous devons fournir un nombre maximum de jetons que le bucket peut prendre en charge (maxTokens) et la durée pendant laquelle un jeton est ajouté au bucket (refillPeriod) :
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
Maintenant, vous vous demandez peut-être : "comment un jeton est ajouté au bucket ?". Pour cela, lorsqu'un bucket est créé, une tâche cron est démarrée, et à chaque milliseconde de rechargePeriod, un nouveau token est ajouté au bucket :
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case <-tb.cron: ticker.Stop() return case <-ticker.C: if tb.tokens < tb.maxTokens { tb.tokens += 1 fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens) if len(tb.subs) > 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub <- true } } } } }() }
Enfin, lorsqu'un client veut un token du bucket, la fonction waitAvailable doit être appelée :
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock() <-ch fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id) tb.tokens -= 1 return true }
Inspiré de https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!