Heim > Artikel > Backend-Entwicklung > Ein Artikel über die strombegrenzenden Leaky-Bucket- und Token-Bucket-Bibliotheken in der Go-Sprache
In diesem Artikel geht es um die strombegrenzenden Leaky-Bucket- und Token-Bucket-Bibliotheken in der Go-Sprache, die Implementierungsprinzipien von Token-Buckets und Leaky-Buckets sowie deren einfache Anwendung in tatsächlichen Projekten.
Wenn auf große Datenmengen gleichzeitig zugegriffen wird, kommt es häufig vor, dass ein Dienst oder eine Schnittstelle mit einer großen Anzahl von Anfragen konfrontiert wird, was zum Absturz der Datenbank führt oder sogar eine Kettenreaktion auslöst, die das gesamte System zum Zusammenbruch bringt. Oder jemand greift die Website böswillig an und eine große Anzahl nutzloser Anfragen kann zu einer Cache-Penetration führen. Die Verwendung von strombegrenzender Middleware kann die Anzahl der Anfragen in kurzer Zeit begrenzen und beim Downgrade eine Rolle spielen, wodurch die Sicherheit der Website gewährleistet wird.
Verwenden Sie Nachrichten-Middleware für eine einheitliche Begrenzung (Geschwindigkeitsreduzierung).
Verwenden Sie das aktuelle Begrenzungsschema, um redundante Anforderungen zurückzugeben (aktuelle Begrenzung).
Aktualisieren Sie den Server Warten auf Gefahr)
Warten
Es ist ersichtlich, dass die einzige Möglichkeit zur Verbesserung des Hardwareniveaus darin besteht, den Code zu verbessern, wenn er nicht verbessert werden kann. Oder ändern Sie die Architektur und fügen Sie eine weitere Ebene hinzu! Sie können auch Nachrichten-Middleware für eine einheitliche Verarbeitung verwenden. Aus einer kombinierten Perspektive ist die aktuelle limitierende Lösung eine Strategie, die weder große Änderungen noch einen hohen Overhead erfordert.
Leaky-Bucket-Algorithmus
Schiebefenster-Algorithmus
usw.
go get -u go.uber.org/ratelimit
// New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter { return newAtomicBased(rate, opts...) } // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter { // TODO consider moving config building to the implementation // independent code. config := buildConfig(opts) perRequest := config.per / time.Duration(rate) l := &atomicLimiter{ perRequest: perRequest, maxSlack: -1 * time.Duration(config.slack) * perRequest, clock: config.clock, } initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l }Diese Funktion verwendet den
go get -u go.uber.org/ratelimit
// Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface { Now() time.Time Sleep(time.Duration) }
该函数使用了函数选项模式对多个结构体对象进行初始化
根据传入的值来初始化一个桶结构体 rate
为int
传参 。
初始化过程中包括了
perquest = config.per / time.Duration(rate)
maxSlack
宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest
松紧度是用来规范等待时间的type state struct { last time.Time sleepFor time.Duration }
同时还需要结构体Clock
来记录当前请求的时间now
和此刻的请求所需要花费等待的时间sleep
func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // 计算是否需要进行等待取水操作 newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔) // 如果等待取水时间特别小,就需要松紧度进行维护 if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待时间大于0,就进行更新 if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) // 最后返回需要等待的时间 return newState.last }
state
主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
实现一个Take方法
该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。
记录下当前的时间 now := t.clock.Now()
oldState.last.IsZero()
判断是不是第一次取水,如果是就直接将state
结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。
如果 newState.sleepFor
非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。
如果newState.sleepFor > 0
就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)
并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0
。
如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)
。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一个 time.Duration的时间 if r1.Take().Sub(now) > 0 { // 返回的时间比当前的时间还大,说明需要进行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待请求时间,就直接进行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。
如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。
如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将if
Funktionsoptionsmodus
rate
ist int
Übergabeparameter. 🎜🎜Der Initialisierungsprozess umfasst 🎜perquest = config.per / time.Duration(rate)
🎜🎜maxSlack
Entspannung ( Die Lockerheit ist ein negativer Wert) -1 * time.Duration(config.slack) * perRequest
Die Lockerheit wird zur Standardisierung der Wartezeit verwendet🎜🎜func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }🎜Gleichzeitig ist die Struktur
Clock
wird auch benötigt, um die Zeit der aktuellen Anfrage now
und die Zeit, die in diesem Moment zum Warten auf die Anfrage benötigt wird sleep
🎜Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)🎜state wird hauptsächlich zum Aufzeichnen verwendet Die Ausführungszeit und die Zeit, die zum Warten auf die aktuelle Ausführungsanforderung benötigt wird (als Zwischenstatusaufzeichnung)🎜
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }🎜Implementieren Sie eine Take-Methode🎜 🎜🎜🎜Die Take-Methode führt atomare Operationen aus (kann als Sperren und Entsperren verstanden werden) und kann dennoch eine normale Verwendung bei einer großen Anzahl gleichzeitiger Anforderungen gewährleisten. 🎜🎜🎜🎜Zeichnen Sie die aktuelle Uhrzeit auf
now := t.clock.Now()
🎜🎜🎜🎜oldState.last.IsZero()
Bestimmen Sie, ob es das erste Mal ist Holen Sie sich Wasser. Wenn ja, geben Sie den Wert direkt in der Struktur state
zurück. Die letzte Ausführungszeit wird in dieser Struktur initialisiert. Wenn es das erste Mal ist, dass Wasser abgerufen wird, wird es direkt als aktuelle Zeit übergeben. 🎜🎜🎜🎜Wenn newState.sleepFor
sehr klein ist, treten Probleme auf, daher müssen Sie Laxität verwenden. Sobald der Mindestwert kleiner als die Laxität ist, verwenden Sie die Laxität, um die Wasserholzeit aufrechtzuerhalten. 🎜🎜🎜🎜Wenn newState.sleepFor > 0
, aktualisieren Sie direkt die letzte Ausführungszeit in der Struktur newState.last = newState.last.Add(newState.sleepFor)
und Notieren Sie die Wartezeit interval, newState.sleepFor = newState.sleepFor, 0
. 🎜🎜🎜🎜Wenn Wasserhol- und Wartevorgänge zulässig sind, bedeutet dies, dass kein Parallelitätswettbewerb stattfindet und die Schlafzeit simuliert wird t.clock.Sleep(interval)
. Anschließend wird die Zielzeit für die Wasserentnahme zurückgegeben und der Servercode bestimmt, ob eine Antwort zurückgesendet oder auf diese Zeit gewartet werden soll, um mit der Antwort fortzufahren. 🎜🎜🎜func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }🎜Tatsächlich wird der aktuelle Begrenzer für die entsprechende Zeit in den Ruhezustand versetzt, wenn eine Anforderung eingeht, und Geben Sie nach dem Schlafen die späteste Wasserentnahmezeit zurück. 🎜
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }🎜Hier können Sie wählen, ob Sie zurückkehren möchten. Da Take definitiv die Schlaffunktion ausführt, bedeutet dies, dass die aktuelle Anforderung empfangen wurde, wenn die Ausführung von Take endet. Die aktuelle Demonstration verwendet den ersten Fall. 🎜
if
direkt zu ignorieren. (empfohlen)🎜这里定义了一个响应函数和一个handler
函数方便测试
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
执行go test -run=Run -v
先开启一个web服务
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
引入ratelimit
库
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
进行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌数小于等于零,那么就返回0个令牌 if count <= 0 { return 0 } // 根据时间对当前桶中令牌数进行计算 tb.adjustavailableTokens(tb.currentTick(now)) // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌 if tb.availableTokens <= 0 { return 0 } // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数 if count > tb.availableTokens { count = tb.availableTokens } // 调整令牌数 tb.availableTokens -= count return count }
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果当前令牌数大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
加锁 defer
解锁
判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数adjustTokens
获取可用的令牌数量
如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允许欠账,令牌不够也可以接收请求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1个令牌并且能够取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
【Verwandte Empfehlungen: Go-Video-Tutorial, Programmierunterricht】
Das obige ist der detaillierte Inhalt vonEin Artikel über die strombegrenzenden Leaky-Bucket- und Token-Bucket-Bibliotheken in der Go-Sprache. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!