ホームページ  >  記事  >  バックエンド開発  >  Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事

Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事

青灯夜游
青灯夜游転載
2023-02-03 19:00:223307ブラウズ

この記事では、Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明し、トークン バケットとリーキー バケットの実装原則と実際のプロジェクトでの簡単なアプリケーションを紹介します。

Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事

#なぜ電流制限ミドルウェアが必要なのでしょうか?

大量のデータが同時にアクセスされると、サービスまたはインターフェイス が大量のリクエストに直面し、データベースがクラッシュ したり、場合によってはエラーが発生したりすることがよくあります。システム全体を崩壊させる連鎖反応。あるいは、誰かが Web サイトを悪意を持って攻撃し、大量の無駄なリクエストがキャッシュの侵入につながる可能性があります。電流制限ミドルウェアを使用すると、短期間のリクエスト数を制限し、ダウングレードに役割を果たし、Web サイトのセキュリティを確保できます。

大量の同時リクエストに対処するための戦略?

  • メッセージミドルウェアを使用して統合制限 (速度低下) を行う

  • 電流制限スキームを使用して冗長なリクエストを返す (電流制限)

  • #サーバーをアップグレードします

  • #キャッシュ (ただし、キャッシュ侵入などの危険はまだあります)
  • など。お待ちください
  • コードを改善できなくなった場合、ハードウェア レベルを向上させる唯一の方法があることがわかります。または、アーキテクチャを変更して別のレイヤーを追加します。メッセージミドルウェアを使用して統合処理を行うこともできます。総合的な観点から見ると、現在の制限ソリューションは、大きな変更や高いオーバーヘッドを必要としない戦略です。

一般的な電流制限スキーム

    トークン バケット アルゴリズム
  • リーキー バケット アルゴリズム
  • スライディング ウィンドウ アルゴリズム
  • お待ちください
リーキー バケット

ratelimit ライブラリの紹介

##go get -u go.uber.org/ratelimit

ライブラリ関数のソースcode

 // New returns a Limiter that will limit to the given RPS.
 func New(rate int, opts ...Option) Limiter {
     return newAtomicBased(rate, opts...)
 }
 
 // newAtomicBased returns a new atomic based limiter.
 func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
     // TODO consider moving config building to the implementation
     // independent code.
     config := buildConfig(opts)
     perRequest := config.per / time.Duration(rate)
     l := &atomicLimiter{
         perRequest: perRequest,
         maxSlack:   -1 * time.Duration(config.slack) * perRequest,
         clock:      config.clock,
     }
 
     initialState := state{
         last:     time.Time{},
         sleepFor: 0,
     }
     atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
     return l
 }
この関数は、 関数オプション mode

を使用して、受信した値に従って

複数の構造 オブジェクトを初期化します。バケット構造 rateint

パラメータとして渡されます。

初期化プロセスには、水滴ごとに必要な時間

perquest = config.per / time.Duration(rate)

## が含まれます。
    #maxSlack
  • Slack (laxity は負の値です) -1 * time.Duration(config.slack) * perRequest Slack は待ち時間を標準化するために使用されます
  •  // Clock is the minimum necessary interface to instantiate a rate limiter with
     // a clock or mock clock, compatible with clocks created using
     // github.com/andres-erbsen/clock.
     type Clock interface {
        Now() time.Time
        Sleep(time.Duration)
     }
    同時に、現在のリクエストの時間nowと現時点でリクエストを待つのにかかる時間を記録するための構造体Clock
  • も必要です
sleep

<pre class="brush:js;toolbar:false;"> type state struct { last time.Time sleepFor time.Duration }</pre> state 主に、最後の実行時間と現在の実行リクエストの待ち時間を記録するために使用されます (中間状態レコードとして)

最も重要な Take ロジック

<pre class="brush:js;toolbar:false;"> func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&amp;t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&amp;t.state, previousStatePointer, unsafe.Pointer(&amp;newState)) continue } // 计算是否需要进行等待取水操作 newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔) // 如果等待取水时间特别小,就需要松紧度进行维护 if newState.sleepFor &lt; t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待时间大于0,就进行更新 if newState.sleepFor &gt; 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&amp;t.state, previousStatePointer, unsafe.Pointer(&amp;newState)) } t.clock.Sleep(interval) // 最后返回需要等待的时间 return newState.last }</pre>Take メソッドの実装

Take メソッドはアトミックな操作を実行します (ロックとロック解除として理解できます)。 、これは多数の同時リクエストの下でも保証されます。通常の使用。

    現在時刻を記録します
  • now := t. Clock.Now()

  • oldState.last .IsZero() 水汲みが初めてかどうかを判断し、初めての場合は、

    state
  • 構造体の値を直接返します。この構造体では最終実行時刻が初期化されており、初めての水取得の場合はそのまま現在時刻として渡されます。
  • newState.sleepFor が非常に小さい場合、問題が発生するため、緩和度を使用する必要があります。最小値が緩和度より小さくなったら、調整は緩めに行ってください。 取水時間中にメンテナンスを行ってください。

  • If newState.sleepFor > 0 構造体の最終実行時刻を直接更新します

    newState.last = newState.last.Add(newState. sleepFor )
  • そして待機時間を記録します
  • interval, newState.sleepFor = newState.sleepFor, 0

    水の取得と待機操作が許可されている場合、同時実行の競合は発生せず、スリープ時間は t.clock.Sleep(interval) でシミュレートされることを意味します。次に、水の回収の目標時間が返され、サーバー コードは応答を送り返すか、この時間まで応答を続けるかを決定します。

  • t.clock.Sleep(interval)

     func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
  • 実際には、リクエストが来ると、電流リミッターはスリープします。就寝後の最新の水取り出し時刻を返します。

実践的なアプリケーション (Gin フレームワークを使用)

 func ratelimit1() func(ctx *gin.Context) {
     r1 := rate1.New(100)
     return func(ctx *gin.Context) {
         now := time.Now()
         //  Take 返回的是一个 time.Duration的时间
         if r1.Take().Sub(now) > 0 {
             // 返回的时间比当前的时间还大,说明需要进行等待
             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
             // 如果不需要等待请求时间,就直接进行Abort 然后返回
             response(ctx, http.StatusRequestTimeout, "rate1 limit...")
             fmt.Println("rate1 limit...")
             ctx.Abort()
             return
         }
         // 放行
         ctx.Next()
     }
 }

ここで、戻るかどうかを選択できます。 Take は必ず sleep 関数を実行するため、take の実行が終了すると、現在のリクエストが受信されたことになります。現在のデモでは最初のケースを使用します。

ビジネスで応答が必要な場合、待つことはできません。その後、上記の例のように水を受け取った後にリクエストを完了できます。

    企業で応答の待機が許可されている場合、リクエストは次のステップに進む前に、対応する水の受け取り時間を待機します。具体的なコードは、
  • if

    の内容を直接無視することです。 (推奨)###

测试代码

这里定义了一个响应函数和一个handler函数方便测试

 func response(c *gin.Context, code int, info any) {
    c.JSON(code, info)
 }
 
 func pingHandler(c *gin.Context) {
    response(c, 200, "ping ok~")
 }

执行go test -run=Run -v先开启一个web服务

 func TestRun(t *testing.T) {
    r := gin.Default()
 
    r.GET("/ping1", ratelimit1(), pingHandler)
    r.GET("/ping2", ratelimit2(), helloHandler)
 
    _ = r.Run(":4399")
 }

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

    Usage: go-wrk <options> <url>
    Options:
     -H       Header to add to each request (you can define multiple -H flags) (Default )
     -M       HTTP method (Default GET)
     -T       Socket/request timeout in ms (Default 1000)
     -body    request body string or @filename (Default )
     -c       Number of goroutines to use (concurrent connections) (Default 10)
     -ca      CA file to verify peer against (SSL/TLS) (Default )
     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
     -d       Duration of test in seconds (Default 10)
     -f       Playback file name (Default <empty>)
     -help    Print help (Default false)
     -host    Host Header (Default )
     -http    Use HTTP/2 (Default true)
     -key     Private key file name (SSL/TLS (Default )
     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
     -no-vr   Skip verifying SSL certificate of the server (Default false)
     -redir   Allow Redirects (Default false)
     -v       Print version details (Default false)

-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒(压测速度过快)。

Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the
 // rate of one token every fillInterval, up to the given
 // maximum capacity. Both arguments must be
 // positive. The bucket is initially full.
 func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
    return NewBucketWithClock(fillInterval, capacity, nil)
 }
 
 // NewBucketWithClock is identical to NewBucket but injects a testable clock
 // interface.
 func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
 }

进行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
    if clock == nil {
       clock = realClock{}
    }
     // 填充速率
    if fillInterval <= 0 {
       panic("token bucket fill interval is not > 0")
    }
     // 最大令牌容量
    if capacity <= 0 {
       panic("token bucket capacity is not > 0")
    }
     // 单次令牌生成量
    if quantum <= 0 {
       panic("token bucket quantum is not > 0")
    }
    return &Bucket{
       clock:           clock,
       startTime:       clock.Now(),
       latestTick:      0,
       fillInterval:    fillInterval,
       capacity:        capacity,
       quantum:         quantum,
       availableTokens: capacity,
    }
 }

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

 // TakeAvailable takes up to count immediately available tokens from the
 // bucket. It returns the number of tokens removed, or zero if there are
 // no available tokens. It does not block.
 func (tb *Bucket) TakeAvailable(count int64) int64 {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.takeAvailable(tb.clock.Now(), count)
 }

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
    // 如果需要取出的令牌数小于等于零,那么就返回0个令牌
     if count <= 0 {
       return 0
    }
     // 根据时间对当前桶中令牌数进行计算
    tb.adjustavailableTokens(tb.currentTick(now))
     // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
    if tb.availableTokens <= 0 {
       return 0
    }
     // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
    if count > tb.availableTokens {
       count = tb.availableTokens
    }
     // 调整令牌数
    tb.availableTokens -= count
    return count
 }
  • 如果需要取出的令牌数小于等于零,那么就返回0个令牌

  • 根据时间对当前桶中令牌数进行计算

  • 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌

  • 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数

  • 调整令牌数

调整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
     // 如果当前令牌数大于最大等于容量,直接返回最大容量
    if tb.availableTokens >= tb.capacity {
       return
    }
     // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
    tb.availableTokens += (tick - lastTick) * tb.quantum
     // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
    if tb.availableTokens > tb.capacity {
       tb.availableTokens = tb.capacity
    }
    return
 }
  • 如果当前令牌数大于最大等于容量,直接返回最大容量

  • 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)

  • 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数

实现原理

  • 加锁 defer 解锁

  • 判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0

  • 调用函数adjustTokens 获取可用的令牌数量

  • 如果当前可以取出的令牌数小于等于0 直接返回 0

  • 如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数

  • 当前的令牌数 -= 取出的令牌数 (count)

  • 返回 count(可以取出的令牌数)

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

 func ratelimit2() func(ctx *gin.Context) {
     // 生成速率 最大容量
     r2 := rate2.NewBucket(time.Second, 200)
     return func(ctx *gin.Context) {
         //r2.Take() // 允许欠账,令牌不够也可以接收请求
         if r2.TakeAvailable(1) == 1 {
             // 如果想要取出1个令牌并且能够取出,就放行
             ctx.Next()
             return
         }
         response(ctx, http.StatusRequestTimeout, "rate2 limit...")
         ctx.Abort()
         return
     }
 }

Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。

【関連する推奨事項: Go ビデオ チュートリアル プログラミング教育

以上がGo 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。