Maison > Article > base de données > Comment utiliser Go+Redis pour implémenter des algorithmes courants de limitation de courant
Il est relativement simple d'implémenter une fenêtre fixe à l'aide de Redis, principalement parce qu'il n'y en aura qu'une seule fixe window en même temps. Nous pouvons donc utiliser la commande pexpire
pour définir le délai d'expiration sur l'heure de la fenêtre lors de la première entrée dans la fenêtre, afin que la fenêtre expire avec le délai d'expiration à. en même temps, nous pouvons utiliser la commande incr
pour augmenter le délai d'expiration. pexpire
命令设置过期时间为窗口时间大小,这样窗口会随过期时间而失效,同时我们使用incr
命令增加窗口计数。
因为我们需要在counter==1
的时候设置窗口的过期时间,为了保证原子性,我们使用简单的Lua
脚本实现。
const fixedWindowLimiterTryAcquireRedisScript = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) -- 获取原始值 local counter = tonumber(redis.call("get", KEYS[1])) if counter == nil then counter = 0 end -- 若到达窗口请求上限,请求失败 if counter >= limit then return 0 end -- 窗口值+1 redis.call("incr", KEYS[1]) if counter == 0 then redis.call("pexpire", KEYS[1], window) end return 1 `
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // FixedWindowLimiter 固定窗口限流器 type FixedWindowLimiter struct { limit int // 窗口请求上限 window int // 窗口时间大小 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewFixedWindowLimiter(client *redis.Client, limit int, window time.Duration) (*FixedWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } return &FixedWindowLimiter{ limit: limit, window: int(window / time.Millisecond), client: client, script: redis.NewScript(fixedWindowLimiterTryAcquireRedisScript), }, nil } func (l *FixedWindowLimiter) TryAcquire(ctx context.Context, resource string) error { success, err := l.script.Run(ctx, l.client, []string{resource}, l.window, l.limit).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
我们使用Redis的hash
存储每个小窗口的计数,每次请求会把所有有效窗口
的计数累加到count
,使用hdel
删除失效窗口,最后判断窗口的总计数是否大于上限。
我们基本上把所有的逻辑都放到Lua脚本里面,其中大头是对hash
的遍历,时间复杂度是O(N),N是小窗口数量,所以小窗口数量最好不要太多。
const slidingWindowLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 -- ARGV[3]: 当前小窗口值 -- ARGV[4]: 起始小窗口值 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- 计算当前窗口的请求总数 local counters = redis.call("hgetall", KEYS[1]) local count = 0 for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then redis.call("hdel", KEYS[1], smallWindow) else count = count + counter end end -- 若到达窗口请求上限,请求失败 if count >= limit then return 0 end -- 若没到窗口请求上限,当前小窗口计数器+1,请求成功 redis.call("hincrby", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) return 1 `
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // SlidingWindowLimiter 滑动窗口限流器 type SlidingWindowLimiter struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindow int64 // 小窗口时间大小 smallWindows int64 // 小窗口数量 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewSlidingWindowLimiter(client *redis.Client, limit int, window, smallWindow time.Duration) ( *SlidingWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 || smallWindow%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } // 窗口时间必须能够被小窗口时间整除 if window%smallWindow != 0 { return nil, errors.New("window cannot be split by integers") } return &SlidingWindowLimiter{ limit: limit, window: int64(window / time.Millisecond), smallWindow: int64(smallWindow / time.Millisecond), smallWindows: int64(window / smallWindow), client: client, script: redis.NewScript(slidingWindowLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingWindowLimiter) TryAcquire(ctx context.Context, resource string) error { // 获取当前小窗口值 currentSmallWindow := time.Now().UnixMilli() / l.smallWindow * l.smallWindow // 获取起始小窗口值 startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1) success, err := l.script.Run( ctx, l.client, []string{resource}, l.window, l.limit, currentSmallWindow, startSmallWindow).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
如果小窗口数量特别多,可以使用list
优化时间复杂度,list的结构是:
[counter, smallWindow1, count1, smallWindow2, count2, smallWindow3, count3...]
也就是我们使用list的第一个元素存储计数器,每个窗口用两个元素表示,第一个元素表示小窗口值,第二个元素表示这个小窗口的计数。由于Redis Lua脚本不支持字符串分割函数,因此不能将小窗口的值和计数放在同一元素中。
具体操作流程:
1.获取list长度
2.如果长度是0,设置counter,长度+1
3.如果长度大于1,获取第二第三个元素
如果该值小于起始小窗口值,counter-第三个元素的值,删除第二第三个元素,长度-2
4.如果counter大于等于limit,请求失败
5.如果长度大于1,获取倒数第二第一个元素
如果倒数第二个元素小窗口值大于等于当前小窗口值,表示当前请求因为网络延迟的问题,到达服务器的时候,窗口已经过时了,把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1
否则,添加新的窗口值,添加新的计数(1),更新过期时间
6.否则,添加新的窗口值,添加新的计数(1),更新过期时间
7.counter + 1
8.返回成功
const slidingWindowLimiterTryAcquireRedisScriptListImpl = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 -- ARGV[3]: 当前小窗口值 -- ARGV[4]: 起始小窗口值 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- 获取list长度 local len = redis.call("llen", KEYS[1]) -- 如果长度是0,设置counter,长度+1 local counter = 0 if len == 0 then redis.call("rpush", KEYS[1], 0) redis.call("pexpire", KEYS[1], window) len = len + 1 else -- 如果长度大于1,获取第二第个元素 local smallWindow1 = tonumber(redis.call("lindex", KEYS[1], 1)) counter = tonumber(redis.call("lindex", KEYS[1], 0)) -- 如果该值小于起始小窗口值 if smallWindow1 < startSmallWindow then local count1 = redis.call("lindex", KEYS[1], 2) -- counter-第三个元素的值 counter = counter - count1 -- 长度-2 len = len - 2 -- 删除第二第三个元素 redis.call("lrem", KEYS[1], 1, smallWindow1) redis.call("lrem", KEYS[1], 1, count1) end end -- 若到达窗口请求上限,请求失败 if counter >= limit then return 0 end -- 如果长度大于1,获取倒数第二第一个元素 if len > 1 then local smallWindown = tonumber(redis.call("lindex", KEYS[1], -2)) -- 如果倒数第二个元素小窗口值大于等于当前小窗口值 if smallWindown >= currentSmallWindow then -- 把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1 local countn = redis.call("lindex", KEYS[1], -1) redis.call("lset", KEYS[1], -1, countn + 1) else -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) end else -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) end -- counter + 1并更新 redis.call("lset", KEYS[1], 0, counter + 1) return 1 `
算法都是操作list
头部或者尾部,所以时间复杂度接近O(1)
漏桶需要保存当前水位和上次放水时间,因此我们使用hash
counter==1
, afin de garantir l'atomicité, nous utilisons un simple Lua
script à implémenter. const leakyBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: 最高水位 -- ARGV[2]: 水流速度/秒 -- ARGV[3]: 当前时间(秒) local peakLevel = tonumber(ARGV[1]) local currentVelocity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime")) local currentLevel = tonumber(redis.call("hget", KEYS[1], "currentLevel")) -- 初始化 if lastTime == nil then lastTime = now currentLevel = 0 redis.call("hmset", KEYS[1], "currentLevel", currentLevel, "lastTime", lastTime) end -- 尝试放水 -- 距离上次放水的时间 local interval = now - lastTime if interval > 0 then -- 当前水位-距离上次放水的时间(秒)*水流速度 local newLevel = currentLevel - interval * currentVelocity if newLevel < 0 then newLevel = 0 end currentLevel = newLevel redis.call("hmset", KEYS[1], "currentLevel", newLevel, "lastTime", now) end -- 若到达最高水位,请求失败 if currentLevel >= peakLevel then return 0 end -- 若没有到达最高水位,当前水位+1,请求成功 redis.call("hincrby", KEYS[1], "currentLevel", 1) redis.call("expire", KEYS[1], peakLevel / currentVelocity) return 1 `rrree
hash
de Redis pour stocker le nombre de chaque petite fenêtre, à chaque fois La requête accumulera le nombre de toutes les fenêtres valides
jusqu'à count
, utilisera hdel
pour supprimer les fenêtres non valides et déterminera enfin si le nombre total de fenêtres est supérieur à la limite supérieure. Nous mettons essentiellement toute la logique dans le script Lua, dont l'essentiel est la traversée du hash
, la complexité temporelle est O(N), N est la petite fenêtre quantité, il est donc préférable de ne pas avoir trop de petites fenêtres. package redis import ( "context" "github.com/go-redis/redis/v8" "time" ) // LeakyBucketLimiter 漏桶限流器 type LeakyBucketLimiter struct { peakLevel int // 最高水位 currentVelocity int // 水流速度/秒 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewLeakyBucketLimiter(client *redis.Client, peakLevel, currentVelocity int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ peakLevel: peakLevel, currentVelocity: currentVelocity, client: client, script: redis.NewScript(leakyBucketLimiterTryAcquireRedisScript), } } func (l *LeakyBucketLimiter) TryAcquire(ctx context.Context, resource string) error { // 当前时间 now := time.Now().Unix() success, err := l.script.Run(ctx, l.client, []string{resource}, l.peakLevel, l.currentVelocity, now).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }rrree
list
pour optimiser la complexité temporelle de la structure de la liste. est : #🎜🎜## 🎜🎜#[counter, smallWindow1, count1, smallWindow2, count2, smallWindow3, count3...]
#🎜🎜##🎜🎜#C'est-à-dire que nous utilisons le premier élément de la liste pour stocker le compteur, chaque fenêtre A est représentée par deux éléments, le premier élément représente la valeur de la petite fenêtre et le deuxième élément représente le nombre de cette petite fenêtre. Étant donné que le script Redis Lua ne prend pas en charge la fonction de fractionnement de chaîne, la valeur et le nombre de la petite fenêtre ne peuvent pas être placés dans le même élément. #🎜🎜##🎜🎜#Processus d'opération spécifique : #🎜🎜##🎜🎜#1 Obtenez la longueur de la liste#🎜🎜##🎜🎜#2. 🎜# #🎜🎜#3. Si la longueur est supérieure à 1, récupérez les deuxième et troisième éléments #🎜🎜##🎜🎜#Si la valeur est inférieure à la valeur de départ de la petite fenêtre, contrez la valeur du troisième élément , supprimez les deuxième et troisième éléments, longueur -2#🎜🎜##🎜🎜#4 Si le compteur est supérieur ou égal à la limite, la requête échoue#🎜🎜##🎜🎜#5. supérieur à 1, obtenez l'avant-dernier élément#🎜🎜 #const tokenBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: 容量 -- ARGV[2]: 发放令牌速率/秒 -- ARGV[3]: 当前时间(秒) local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime")) local currentTokens = tonumber(redis.call("hget", KEYS[1], "currentTokens")) -- 初始化 if lastTime == nil then lastTime = now currentTokens = capacity redis.call("hmset", KEYS[1], "currentTokens", currentTokens, "lastTime", lastTime) end -- 尝试发放令牌 -- 距离上次发放令牌的时间 local interval = now - lastTime if interval > 0 then -- 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 local newTokens = currentTokens + interval * rate if newTokens > capacity then newTokens = capacity end currentTokens = newTokens redis.call("hmset", KEYS[1], "currentTokens", newTokens, "lastTime", now) end -- 如果没有令牌,请求失败 if currentTokens == 0 then return 0 end -- 果有令牌,当前令牌-1,请求成功 redis.call("hincrby", KEYS[1], "currentTokens", -1) redis.call("expire", KEYS[1], capacity / rate) return 1 `#🎜🎜#Les algorithmes fonctionnent sur la
list
head ou tail , donc la complexité temporelle est proche de O(1)#🎜🎜##🎜🎜#Leaky bucket algorithm#🎜🎜##🎜 🎜#Le seau qui fuit doit enregistrer le niveau d'eau actuel et l'heure de la dernière libération d'eau, nous utilisons donc hash code> pour enregistrer ces deux valeurs. #🎜🎜#<pre class="brush:go;">package redis
import (
"context"
"github.com/go-redis/redis/v8"
"time"
)
// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
capacity int // 容量
rate int // 发放令牌速率/秒
client *redis.Client // Redis客户端
script *redis.Script // TryAcquire脚本
}
func NewTokenBucketLimiter(client *redis.Client, capacity, rate int) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: capacity,
rate: rate,
client: client,
script: redis.NewScript(tokenBucketLimiterTryAcquireRedisScript),
}
}
func (l *TokenBucketLimiter) TryAcquire(ctx context.Context, resource string) error {
// 当前时间
now := time.Now().Unix()
success, err := l.script.Run(ctx, l.client, []string{resource}, l.capacity, l.rate, now).Bool()
if err != nil {
return err
}
// 若到达窗口请求上限,请求失败
if !success {
return ErrAcquireFailed
}
return nil
}</pre><pre class="brush:plain;">const slidingLogLimiterTryAcquireRedisScriptHashImpl = `
-- ARGV[1]: 当前小窗口值
-- ARGV[2]: 第一个策略的窗口时间大小
-- ARGV[i * 2 + 1]: 每个策略的起始小窗口值
-- ARGV[i * 2 + 2]: 每个策略的窗口请求上限
local currentSmallWindow = tonumber(ARGV[1])
-- 第一个策略的窗口时间大小
local window = tonumber(ARGV[2])
-- 第一个策略的起始小窗口值
local startSmallWindow = tonumber(ARGV[3])
local strategiesLen = #(ARGV) / 2 - 1
-- 计算每个策略当前窗口的请求总数
local counters = redis.call("hgetall", KEYS[1])
local counts = {}
-- 初始化counts
for j = 1, strategiesLen do
counts[j] = 0
end
for i = 1, #(counters) / 2 do
local smallWindow = tonumber(counters[i * 2 - 1])
local counter = tonumber(counters[i * 2])
if smallWindow < startSmallWindow then
redis.call("hdel", KEYS[1], smallWindow)
else
for j = 1, strategiesLen do
if smallWindow >= tonumber(ARGV[j * 2 + 1]) then
counts[j] = counts[j] + counter
end
end
end
end
-- 若到达对应策略窗口请求上限,请求失败,返回违背的策略下标
for i = 1, strategiesLen do
if counts[i] >= tonumber(ARGV[i * 2 + 2]) then
return i - 1
end
end
-- 若没到窗口请求上限,当前小窗口计数器+1,请求成功
redis.call("hincrby", KEYS[1], currentSmallWindow, 1)
redis.call("pexpire", KEYS[1], window)
return -1
`</pre>#🎜🎜#Token Bucket#🎜🎜##🎜🎜#Le seau à jetons peut être considéré comme l'algorithme opposé du seau qui fuit. L'un d'eux consiste à verser de l'eau dans le seau et l'autre à récupérer le seau. jeton du seau. #🎜🎜#<pre class="brush:go;">package redis
import (
"context"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"sort"
"time"
)
// ViolationStrategyError 违背策略错误
type ViolationStrategyError struct {
Limit int // 窗口请求上限
Window time.Duration // 窗口时间大小
}
func (e *ViolationStrategyError) Error() string {
return fmt.Sprintf("violation strategy that limit = %d and window = %d", e.Limit, e.Window)
}
// SlidingLogLimiterStrategy 滑动日志限流器的策略
type SlidingLogLimiterStrategy struct {
limit int // 窗口请求上限
window int64 // 窗口时间大小
smallWindows int64 // 小窗口数量
}
func NewSlidingLogLimiterStrategy(limit int, window time.Duration) *SlidingLogLimiterStrategy {
return &SlidingLogLimiterStrategy{
limit: limit,
window: int64(window),
}
}
// SlidingLogLimiter 滑动日志限流器
type SlidingLogLimiter struct {
strategies []*SlidingLogLimiterStrategy // 滑动日志限流器策略列表
smallWindow int64 // 小窗口时间大小
client *redis.Client // Redis客户端
script *redis.Script // TryAcquire脚本
}
func NewSlidingLogLimiter(client *redis.Client, smallWindow time.Duration, strategies ...*SlidingLogLimiterStrategy) (
*SlidingLogLimiter, error) {
// 复制策略避免被修改
strategies = append(make([]*SlidingLogLimiterStrategy, 0, len(strategies)), strategies...)
// 不能不设置策略
if len(strategies) == 0 {
return nil, errors.New("must be set strategies")
}
// redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除
if smallWindow%time.Millisecond != 0 {
return nil, errors.New("the window uint must not be less than millisecond")
}
smallWindow = smallWindow / time.Millisecond
for _, strategy := range strategies {
if strategy.window%int64(time.Millisecond) != 0 {
return nil, errors.New("the window uint must not be less than millisecond")
}
strategy.window = strategy.window / int64(time.Millisecond)
}
// 排序策略,窗口时间大的排前面,相同窗口上限大的排前面
sort.Slice(strategies, func(i, j int) bool {
a, b := strategies[i], strategies[j]
if a.window == b.window {
return a.limit > b.limit
}
return a.window > b.window
})
for i, strategy := range strategies {
// 随着窗口时间变小,窗口上限也应该变小
if i > 0 {
if strategy.limit >= strategies[i-1].limit {
return nil, errors.New("the smaller window should be the smaller limit")
}
}
// 窗口时间必须能够被小窗口时间整除
if strategy.window%int64(smallWindow) != 0 {
return nil, errors.New("window cannot be split by integers")
}
strategy.smallWindows = strategy.window / int64(smallWindow)
}
return &SlidingLogLimiter{
strategies: strategies,
smallWindow: int64(smallWindow),
client: client,
script: redis.NewScript(slidingLogLimiterTryAcquireRedisScriptHashImpl),
}, nil
}
func (l *SlidingLogLimiter) TryAcquire(ctx context.Context, resource string) error {
// 获取当前小窗口值
currentSmallWindow := time.Now().UnixMilli() / l.smallWindow * l.smallWindow
args := make([]interface{}, len(l.strategies)*2+2)
args[0] = currentSmallWindow
args[1] = l.strategies[0].window
// 获取每个策略的起始小窗口值
for i, strategy := range l.strategies {
args[i*2+2] = currentSmallWindow - l.smallWindow*(strategy.smallWindows-1)
args[i*2+3] = strategy.limit
}
index, err := l.script.Run(
ctx, l.client, []string{resource}, args...).Int()
if err != nil {
return err
}
// 若到达窗口请求上限,请求失败
if index != -1 {
return &ViolationStrategyError{
Limit: l.strategies[index].limit,
Window: time.Duration(l.strategies[index].window),
}
}
return nil
}</pre>rrreee#🎜🎜#Sliding Log#🎜🎜##🎜🎜#Le processus de l'algorithme est le même que la fenêtre glissante, sauf qu'il peut spécifier plusieurs stratégies en même temps, lorsque la requête échoue, l'appelant. doit être informé de la stratégie utilisée. #🎜🎜 #rrreeerrree
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!