Die Verwendung von Redis zum Implementieren fester Fenster ist relativ einfach, vor allem, weil es immer nur ein festes Fenster gleichzeitig gibt, sodass wir es mit dem Befehl pexpire
festlegen können beim ersten Betreten des Fensters Die Ablaufzeit ist die Fensterzeitgröße, daher läuft das Fenster mit der Ablaufzeit ab. Gleichzeitig verwenden wir den Befehl incr
, um die Fensteranzahl zu erhöhen. pexpire
命令设置过期时间为窗口时间大小,这样窗口会随过期时间而失效,同时我们使用incr
命令增加窗口计数。
因为我们需要在counter==1
的时候设置窗口的过期时间,为了保证原子性,我们使用简单的Lua
脚本实现。
const fixedWindowLimiterTryAcquireRedisScript = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) -- 获取原始值 local counter = tonumber(redis.call("get", KEYS[1])) if counter == nil then counter = 0 end -- 若到达窗口请求上限,请求失败 if counter >= limit then return 0 end -- 窗口值+1 redis.call("incr", KEYS[1]) if counter == 0 then redis.call("pexpire", KEYS[1], window) end return 1 `
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // FixedWindowLimiter 固定窗口限流器 type FixedWindowLimiter struct { limit int // 窗口请求上限 window int // 窗口时间大小 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewFixedWindowLimiter(client *redis.Client, limit int, window time.Duration) (*FixedWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } return &FixedWindowLimiter{ limit: limit, window: int(window / time.Millisecond), client: client, script: redis.NewScript(fixedWindowLimiterTryAcquireRedisScript), }, nil } func (l *FixedWindowLimiter) TryAcquire(ctx context.Context, resource string) error { success, err := l.script.Run(ctx, l.client, []string{resource}, l.window, l.limit).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
我们使用Redis的hash
存储每个小窗口的计数,每次请求会把所有有效窗口
的计数累加到count
,使用hdel
删除失效窗口,最后判断窗口的总计数是否大于上限。
我们基本上把所有的逻辑都放到Lua脚本里面,其中大头是对hash
的遍历,时间复杂度是O(N),N是小窗口数量,所以小窗口数量最好不要太多。
const slidingWindowLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 -- ARGV[3]: 当前小窗口值 -- ARGV[4]: 起始小窗口值 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- 计算当前窗口的请求总数 local counters = redis.call("hgetall", KEYS[1]) local count = 0 for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then redis.call("hdel", KEYS[1], smallWindow) else count = count + counter end end -- 若到达窗口请求上限,请求失败 if count >= limit then return 0 end -- 若没到窗口请求上限,当前小窗口计数器+1,请求成功 redis.call("hincrby", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) return 1 `
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // SlidingWindowLimiter 滑动窗口限流器 type SlidingWindowLimiter struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindow int64 // 小窗口时间大小 smallWindows int64 // 小窗口数量 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewSlidingWindowLimiter(client *redis.Client, limit int, window, smallWindow time.Duration) ( *SlidingWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 || smallWindow%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } // 窗口时间必须能够被小窗口时间整除 if window%smallWindow != 0 { return nil, errors.New("window cannot be split by integers") } return &SlidingWindowLimiter{ limit: limit, window: int64(window / time.Millisecond), smallWindow: int64(smallWindow / time.Millisecond), smallWindows: int64(window / smallWindow), client: client, script: redis.NewScript(slidingWindowLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingWindowLimiter) TryAcquire(ctx context.Context, resource string) error { // 获取当前小窗口值 currentSmallWindow := time.Now().UnixMilli() / l.smallWindow * l.smallWindow // 获取起始小窗口值 startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1) success, err := l.script.Run( ctx, l.client, []string{resource}, l.window, l.limit, currentSmallWindow, startSmallWindow).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
如果小窗口数量特别多,可以使用list
优化时间复杂度,list的结构是:
[counter, smallWindow1, count1, smallWindow2, count2, smallWindow3, count3...]
也就是我们使用list的第一个元素存储计数器,每个窗口用两个元素表示,第一个元素表示小窗口值,第二个元素表示这个小窗口的计数。由于Redis Lua脚本不支持字符串分割函数,因此不能将小窗口的值和计数放在同一元素中。
具体操作流程:
1.获取list长度
2.如果长度是0,设置counter,长度+1
3.如果长度大于1,获取第二第三个元素
如果该值小于起始小窗口值,counter-第三个元素的值,删除第二第三个元素,长度-2
4.如果counter大于等于limit,请求失败
5.如果长度大于1,获取倒数第二第一个元素
如果倒数第二个元素小窗口值大于等于当前小窗口值,表示当前请求因为网络延迟的问题,到达服务器的时候,窗口已经过时了,把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1
否则,添加新的窗口值,添加新的计数(1),更新过期时间
6.否则,添加新的窗口值,添加新的计数(1),更新过期时间
7.counter + 1
8.返回成功
const slidingWindowLimiterTryAcquireRedisScriptListImpl = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 -- ARGV[3]: 当前小窗口值 -- ARGV[4]: 起始小窗口值 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- 获取list长度 local len = redis.call("llen", KEYS[1]) -- 如果长度是0,设置counter,长度+1 local counter = 0 if len == 0 then redis.call("rpush", KEYS[1], 0) redis.call("pexpire", KEYS[1], window) len = len + 1 else -- 如果长度大于1,获取第二第个元素 local smallWindow1 = tonumber(redis.call("lindex", KEYS[1], 1)) counter = tonumber(redis.call("lindex", KEYS[1], 0)) -- 如果该值小于起始小窗口值 if smallWindow1 < startSmallWindow then local count1 = redis.call("lindex", KEYS[1], 2) -- counter-第三个元素的值 counter = counter - count1 -- 长度-2 len = len - 2 -- 删除第二第三个元素 redis.call("lrem", KEYS[1], 1, smallWindow1) redis.call("lrem", KEYS[1], 1, count1) end end -- 若到达窗口请求上限,请求失败 if counter >= limit then return 0 end -- 如果长度大于1,获取倒数第二第一个元素 if len > 1 then local smallWindown = tonumber(redis.call("lindex", KEYS[1], -2)) -- 如果倒数第二个元素小窗口值大于等于当前小窗口值 if smallWindown >= currentSmallWindow then -- 把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1 local countn = redis.call("lindex", KEYS[1], -1) redis.call("lset", KEYS[1], -1, countn + 1) else -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) end else -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) end -- counter + 1并更新 redis.call("lset", KEYS[1], 0, counter + 1) return 1 `
算法都是操作list
头部或者尾部,所以时间复杂度接近O(1)
漏桶需要保存当前水位和上次放水时间,因此我们使用hash
counter==1
ist, verwenden wir eine einfache Lua
-Skriptimplementierung, um die Atomizität sicherzustellen. const leakyBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: 最高水位 -- ARGV[2]: 水流速度/秒 -- ARGV[3]: 当前时间(秒) local peakLevel = tonumber(ARGV[1]) local currentVelocity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime")) local currentLevel = tonumber(redis.call("hget", KEYS[1], "currentLevel")) -- 初始化 if lastTime == nil then lastTime = now currentLevel = 0 redis.call("hmset", KEYS[1], "currentLevel", currentLevel, "lastTime", lastTime) end -- 尝试放水 -- 距离上次放水的时间 local interval = now - lastTime if interval > 0 then -- 当前水位-距离上次放水的时间(秒)*水流速度 local newLevel = currentLevel - interval * currentVelocity if newLevel < 0 then newLevel = 0 end currentLevel = newLevel redis.call("hmset", KEYS[1], "currentLevel", newLevel, "lastTime", now) end -- 若到达最高水位,请求失败 if currentLevel >= peakLevel then return 0 end -- 若没有到达最高水位,当前水位+1,请求成功 redis.call("hincrby", KEYS[1], "currentLevel", 1) redis.call("expire", KEYS[1], peakLevel / currentVelocity) return 1 `
package redis import ( "context" "github.com/go-redis/redis/v8" "time" ) // LeakyBucketLimiter 漏桶限流器 type LeakyBucketLimiter struct { peakLevel int // 最高水位 currentVelocity int // 水流速度/秒 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewLeakyBucketLimiter(client *redis.Client, peakLevel, currentVelocity int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ peakLevel: peakLevel, currentVelocity: currentVelocity, client: client, script: redis.NewScript(leakyBucketLimiterTryAcquireRedisScript), } } func (l *LeakyBucketLimiter) TryAcquire(ctx context.Context, resource string) error { // 当前时间 now := time.Now().Unix() success, err := l.script.Run(ctx, l.client, []string{resource}, l.peakLevel, l.currentVelocity, now).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
Hash
von Redis, um die Anzahl jedes kleinen Fensters zu speichern, und jede Anfrage speichert alle gültigen Fenster
Die Anzahl wird zu count
akkumuliert, mit hdel
wird das ungültige Fenster gelöscht und schließlich ermittelt, ob die Gesamtanzahl des Fensters größer als die Obergrenze ist. Wir haben im Grunde die gesamte Logik in das Lua-Skript eingefügt. Der große Teil ist das Durchlaufen von hash
. Die Zeitkomplexität ist O(N), also die Anzahl kleine Fenster Am besten nicht zu viele. const tokenBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: 容量 -- ARGV[2]: 发放令牌速率/秒 -- ARGV[3]: 当前时间(秒) local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime")) local currentTokens = tonumber(redis.call("hget", KEYS[1], "currentTokens")) -- 初始化 if lastTime == nil then lastTime = now currentTokens = capacity redis.call("hmset", KEYS[1], "currentTokens", currentTokens, "lastTime", lastTime) end -- 尝试发放令牌 -- 距离上次发放令牌的时间 local interval = now - lastTime if interval > 0 then -- 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 local newTokens = currentTokens + interval * rate if newTokens > capacity then newTokens = capacity end currentTokens = newTokens redis.call("hmset", KEYS[1], "currentTokens", newTokens, "lastTime", now) end -- 如果没有令牌,请求失败 if currentTokens == 0 then return 0 end -- 果有令牌,当前令牌-1,请求成功 redis.call("hincrby", KEYS[1], "currentTokens", -1) redis.call("expire", KEYS[1], capacity / rate) return 1 `rrree
list
verwenden, um die zeitliche Komplexität zu optimieren. Die Struktur der Liste ist: 🎜🎜[counter, smallWindow1, count1 , smallWindow2, count2, smallWindow3, count3...]
🎜🎜Das heißt, wir verwenden das erste Element der Liste, um den Zähler zu speichern Das erste Element stellt den Wert des kleinen Fensters dar. Zwei Elemente stellen die Anzahl dieses kleinen Fensters dar. Da das Redis Lua-Skript die Funktion zum Aufteilen von Zeichenfolgen nicht unterstützt, können der Wert und die Anzahl des kleinen Fensters nicht im selben Element platziert werden. 🎜🎜Spezifischer Vorgang: 🎜🎜1. Wenn die Länge 0 ist, setzen Sie den Zähler, die Länge + 1. 🎜🎜3. Wenn die Länge größer als 1 ist, erhalten Sie das zweite und dritte Element Wenn der Wert kleiner ist als der Startwert des kleinen Fensters, zähle den Wert des dritten Elements, lösche das zweite und dritte Element, Länge -2🎜🎜4. Wenn der Zähler größer oder gleich dem Grenzwert ist, schlägt die Anfrage fehl🎜🎜5 . Wenn die Länge größer als 1 ist, erhalten Sie das letzte Element 2. Das erste Element🎜package redis import ( "context" "github.com/go-redis/redis/v8" "time" ) // TokenBucketLimiter 令牌桶限流器 type TokenBucketLimiter struct { capacity int // 容量 rate int // 发放令牌速率/秒 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewTokenBucketLimiter(client *redis.Client, capacity, rate int) *TokenBucketLimiter { return &TokenBucketLimiter{ capacity: capacity, rate: rate, client: client, script: redis.NewScript(tokenBucketLimiterTryAcquireRedisScript), } } func (l *TokenBucketLimiter) TryAcquire(ctx context.Context, resource string) error { // 当前时间 now := time.Now().Unix() success, err := l.script.Run(ctx, l.client, []string{resource}, l.capacity, l.rate, now).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }🎜Der Algorithmus arbeitet am Anfang oder am Ende von
list
, daher ist die Zeitkomplexität nahe an O(1)🎜🎜Leaky-Bucket-Algorithmus🎜🎜Der Leaky-Bucket muss den aktuellen Wasserstand und die letzte Wasserabgabezeit speichern, daher verwenden wir hash
, um diese beiden Werte zu speichern. 🎜const slidingLogLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: 当前小窗口值 -- ARGV[2]: 第一个策略的窗口时间大小 -- ARGV[i * 2 + 1]: 每个策略的起始小窗口值 -- ARGV[i * 2 + 2]: 每个策略的窗口请求上限 local currentSmallWindow = tonumber(ARGV[1]) -- 第一个策略的窗口时间大小 local window = tonumber(ARGV[2]) -- 第一个策略的起始小窗口值 local startSmallWindow = tonumber(ARGV[3]) local strategiesLen = #(ARGV) / 2 - 1 -- 计算每个策略当前窗口的请求总数 local counters = redis.call("hgetall", KEYS[1]) local counts = {} -- 初始化counts for j = 1, strategiesLen do counts[j] = 0 end for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then redis.call("hdel", KEYS[1], smallWindow) else for j = 1, strategiesLen do if smallWindow >= tonumber(ARGV[j * 2 + 1]) then counts[j] = counts[j] + counter end end end end -- 若到达对应策略窗口请求上限,请求失败,返回违背的策略下标 for i = 1, strategiesLen do if counts[i] >= tonumber(ARGV[i * 2 + 2]) then return i - 1 end end -- 若没到窗口请求上限,当前小窗口计数器+1,请求成功 redis.call("hincrby", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) return -1 `
package redis import ( "context" "errors" "fmt" "github.com/go-redis/redis/v8" "sort" "time" ) // ViolationStrategyError 违背策略错误 type ViolationStrategyError struct { Limit int // 窗口请求上限 Window time.Duration // 窗口时间大小 } func (e *ViolationStrategyError) Error() string { return fmt.Sprintf("violation strategy that limit = %d and window = %d", e.Limit, e.Window) } // SlidingLogLimiterStrategy 滑动日志限流器的策略 type SlidingLogLimiterStrategy struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindows int64 // 小窗口数量 } func NewSlidingLogLimiterStrategy(limit int, window time.Duration) *SlidingLogLimiterStrategy { return &SlidingLogLimiterStrategy{ limit: limit, window: int64(window), } } // SlidingLogLimiter 滑动日志限流器 type SlidingLogLimiter struct { strategies []*SlidingLogLimiterStrategy // 滑动日志限流器策略列表 smallWindow int64 // 小窗口时间大小 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewSlidingLogLimiter(client *redis.Client, smallWindow time.Duration, strategies ...*SlidingLogLimiterStrategy) ( *SlidingLogLimiter, error) { // 复制策略避免被修改 strategies = append(make([]*SlidingLogLimiterStrategy, 0, len(strategies)), strategies...) // 不能不设置策略 if len(strategies) == 0 { return nil, errors.New("must be set strategies") } // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if smallWindow%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } smallWindow = smallWindow / time.Millisecond for _, strategy := range strategies { if strategy.window%int64(time.Millisecond) != 0 { return nil, errors.New("the window uint must not be less than millisecond") } strategy.window = strategy.window / int64(time.Millisecond) } // 排序策略,窗口时间大的排前面,相同窗口上限大的排前面 sort.Slice(strategies, func(i, j int) bool { a, b := strategies[i], strategies[j] if a.window == b.window { return a.limit > b.limit } return a.window > b.window }) for i, strategy := range strategies { // 随着窗口时间变小,窗口上限也应该变小 if i > 0 { if strategy.limit >= strategies[i-1].limit { return nil, errors.New("the smaller window should be the smaller limit") } } // 窗口时间必须能够被小窗口时间整除 if strategy.window%int64(smallWindow) != 0 { return nil, errors.New("window cannot be split by integers") } strategy.smallWindows = strategy.window / int64(smallWindow) } return &SlidingLogLimiter{ strategies: strategies, smallWindow: int64(smallWindow), client: client, script: redis.NewScript(slidingLogLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingLogLimiter) TryAcquire(ctx context.Context, resource string) error { // 获取当前小窗口值 currentSmallWindow := time.Now().UnixMilli() / l.smallWindow * l.smallWindow args := make([]interface{}, len(l.strategies)*2+2) args[0] = currentSmallWindow args[1] = l.strategies[0].window // 获取每个策略的起始小窗口值 for i, strategy := range l.strategies { args[i*2+2] = currentSmallWindow - l.smallWindow*(strategy.smallWindows-1) args[i*2+3] = strategy.limit } index, err := l.script.Run( ctx, l.client, []string{resource}, args...).Int() if err != nil { return err } // 若到达窗口请求上限,请求失败 if index != -1 { return &ViolationStrategyError{ Limit: l.strategies[index].limit, Window: time.Duration(l.strategies[index].window), } } return nil }🎜Token Bucket🎜🎜Token Bucket kann als das Gegenteil von Leaky Bucket angesehen werden. Einer davon besteht darin, Wasser in den Eimer zu gießen, und der andere darin, Token aus dem Eimer zu holen. 🎜rrreeerrreee🎜Sliding log🎜🎜Der Algorithmusprozess ist der gleiche wie beim Sliding Window, außer dass mehrere Strategien angegeben werden können. Wenn die Anfrage fehlschlägt, muss der Anrufer gleichzeitig benachrichtigt werden, welche Strategie abgefangen wurde. 🎜rrreeerrree
Das obige ist der detaillierte Inhalt vonSo implementieren Sie mit Go+Redis gängige Strombegrenzungsalgorithmen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!