Maison >développement back-end >Golang >Un article pour parler des bibliothèques de buckets à fuites et de buckets de jetons limitant le courant dans le langage Go

Un article pour parler des bibliothèques de buckets à fuites et de buckets de jetons limitant le courant dans le langage Go

青灯夜游
青灯夜游avant
2023-02-03 19:00:223393parcourir

Cet article parlera des bibliothèques de buckets à fuites et de buckets à jetons limitant actuellement le langage Go, présentera les principes de mise en œuvre des buckets à jetons et des buckets à fuites, ainsi que leur application simple dans des projets réels.

Un article pour parler des bibliothèques de buckets à fuites et de buckets de jetons limitant le courant dans le langage Go

Pourquoi avez-vous besoin d'un middleware limitant actuel ?

Lorsque de grandes quantités de données sont accédées simultanément, il arrive souvent qu'un service ou une interface fait face à un grand nombre de requêtes, provoquant le crash de la base de données, voire déclenchant une réaction en chaîne qui provoque l'effondrement de l'ensemble du système. Ou bien quelqu'un attaque le site Web de manière malveillante, et un grand nombre de requêtes inutiles peuvent entraîner une pénétration du cache. L'utilisation d'un middleware limitant le courant peut limiter le nombre de requêtes sur une courte période et jouer un rôle dans la rétrogradation, garantissant ainsi la sécurité du site Web.

Stratégie pour traiter un grand nombre de demandes simultanées ?

  • Utilisez un middleware de messages pour une limitation unifiée (réduction de la vitesse)

  • Utilisez le schéma de limitation actuel pour renvoyer les requêtes redondantes (limitation actuelle)

  • Mettez à niveau le serveur

  • Cache (mais il y a toujours une pénétration du cache En attendant le danger)

  • En attente

On voit que lorsque le code ne peut plus être amélioré, le seul moyen est d'améliorer le niveau matériel. Ou changez l'architecture et ajoutez une autre couche ! Vous pouvez également utiliser un middleware de messages pour un traitement unifié. D’un point de vue combiné, la solution limitante actuelle est une stratégie qui ne nécessite ni changements majeurs ni frais généraux élevés. "Schémas courants de limitation de courant"

Leaky Bucket

  • Introduction de la bibliothèque ratelimit

  • go get -u go.uber.org/ratelimit

    Code source de la fonction bibliothèque

  •  // New returns a Limiter that will limit to the given RPS.
     func New(rate int, opts ...Option) Limiter {
         return newAtomicBased(rate, opts...)
     }
     
     // newAtomicBased returns a new atomic based limiter.
     func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
         // TODO consider moving config building to the implementation
         // independent code.
         config := buildConfig(opts)
         perRequest := config.per / time.Duration(rate)
         l := &atomicLimiter{
             perRequest: perRequest,
             maxSlack:   -1 * time.Duration(config.slack) * perRequest,
             clock:      config.clock,
         }
     
         initialState := state{
             last:     time.Time{},
             sleepFor: 0,
         }
         atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
         return l
     }
  • Cette fonction utilise le

    mode d'option de fonction

    pour initialiser
  • plusieurs structures
  • objets

    initialiser une structure de compartiment en fonction de la valeur transmise rate est int Pass paramètres.
Le processus d'initialisation comprend

    Le temps requis pour chaque goutte d'eau perquest = config.per / time.Duration(rate)

maxSlack Relaxation ( Le relâchement est une valeur négative) -1 * time.Duration(config.slack) * perRequest Le relâchement sert à standardiser le temps d'attente

 // Clock is the minimum necessary interface to instantiate a rate limiter with
 // a clock or mock clock, compatible with clocks created using
 // github.com/andres-erbsen/clock.
 type Clock interface {
    Now() time.Time
    Sleep(time.Duration)
 }
En même temps, la structure L'horlogeest également nécessaire code> pour enregistrer l'heure de la demande en cours maintenant et le temps qu'il faut pour attendre la demande à ce moment dormir

 type state struct {
    last     time.Time
    sleepFor time.Duration
 }
state est principalement utilisé pour enregistrer le temps d'exécution et le temps nécessaire pour attendre la demande d'exécution en cours (en tant qu'enregistrement d'état intermédiaire)

go get -u go.uber.org/ratelimit

库函数源代码

 func (t *atomicLimiter) Take() time.Time {
    var (
       newState state
       taken    bool
       interval time.Duration
    )
    for !taken {
       now := t.clock.Now()
 
       previousStatePointer := atomic.LoadPointer(&t.state)
       oldState := (*state)(previousStatePointer)
 
       newState = state{
          last:     now,
          sleepFor: oldState.sleepFor,
       }

       if oldState.last.IsZero() {
          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
          continue
       }
       // 计算是否需要进行等待取水操作
       newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)
        
        // 如果等待取水时间特别小,就需要松紧度进行维护
       if newState.sleepFor < t.maxSlack {
          newState.sleepFor = t.maxSlack
       }
        // 如果等待时间大于0,就进行更新
       if newState.sleepFor > 0 {
          newState.last = newState.last.Add(newState.sleepFor)
          interval, newState.sleepFor = newState.sleepFor, 0
       }
       taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
    }
    t.clock.Sleep(interval)
    // 最后返回需要等待的时间
     return newState.last
 }

该函数使用了函数选项模式多个结构体对象进行初始化

根据传入的值来初始化一个桶结构体 rateint 传参 。

初始化过程中包括了

  • 每一滴水需要的时间 perquest = config.per / time.Duration(rate)
  • maxSlack 宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest 松紧度是用来规范等待时间的
 func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

同时还需要结构体Clock来记录当前请求的时间now和此刻的请求所需要花费等待的时间sleep

 func ratelimit1() func(ctx *gin.Context) {
     r1 := rate1.New(100)
     return func(ctx *gin.Context) {
         now := time.Now()
         //  Take 返回的是一个 time.Duration的时间
         if r1.Take().Sub(now) > 0 {
             // 返回的时间比当前的时间还大,说明需要进行等待
             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
             // 如果不需要等待请求时间,就直接进行Abort 然后返回
             response(ctx, http.StatusRequestTimeout, "rate1 limit...")
             fmt.Println("rate1 limit...")
             ctx.Abort()
             return
         }
         // 放行
         ctx.Next()
     }
 }

state 主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)

最重要的Take逻辑

 func response(c *gin.Context, code int, info any) {
    c.JSON(code, info)
 }
 
 func pingHandler(c *gin.Context) {
    response(c, 200, "ping ok~")
 }

实现一个Take方法

  • 该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。

  • 记录下当前的时间 now := t.clock.Now()

  • oldState.last.IsZero()判断是不是第一次取水,如果是就直接将state结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。

  • 如果 newState.sleepFor 非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。

  • 如果newState.sleepFor > 0 就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0

  • 如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。

t.clock.Sleep(interval)

 func TestRun(t *testing.T) {
    r := gin.Default()
 
    r.GET("/ping1", ratelimit1(), pingHandler)
    r.GET("/ping2", ratelimit2(), helloHandler)
 
    _ = r.Run(":4399")
 }

实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。

实际应用(使用Gin框架)

    Usage: go-wrk <options> <url>
    Options:
     -H       Header to add to each request (you can define multiple -H flags) (Default )
     -M       HTTP method (Default GET)
     -T       Socket/request timeout in ms (Default 1000)
     -body    request body string or @filename (Default )
     -c       Number of goroutines to use (concurrent connections) (Default 10)
     -ca      CA file to verify peer against (SSL/TLS) (Default )
     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
     -d       Duration of test in seconds (Default 10)
     -f       Playback file name (Default <empty>)
     -help    Print help (Default false)
     -host    Host Header (Default )
     -http    Use HTTP/2 (Default true)
     -key     Private key file name (SSL/TLS (Default )
     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
     -no-vr   Skip verifying SSL certificate of the server (Default false)
     -redir   Allow Redirects (Default false)
     -v       Print version details (Default false)

这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。

  • 如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。

  • 如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将ifLe plus important Logique Take

     // NewBucket returns a new token bucket that fills at the
     // rate of one token every fillInterval, up to the given
     // maximum capacity. Both arguments must be
     // positive. The bucket is initially full.
     func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
        return NewBucketWithClock(fillInterval, capacity, nil)
     }
     
     // NewBucketWithClock is identical to NewBucket but injects a testable clock
     // interface.
     func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
        return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
     }
    🎜Implémenter une méthode Take🎜 🎜🎜🎜La méthode Take effectue des opérations atomiques (peut être comprise comme un verrouillage et un déverrouillage) et peut toujours garantir une utilisation normale sous un grand nombre de requêtes simultanées. 🎜🎜🎜🎜Enregistrez l'heure actuelle now := t.clock.Now()🎜🎜🎜🎜oldState.last.IsZero() Déterminez si c'est la première fois Obtenez de l'eau, si c'est le cas, renvoyez directement la valeur dans la structure state. L'heure de la dernière exécution est initialisée dans cette structure. Si c'est la première fois qu'on va chercher de l'eau, elle sera passée directement comme heure actuelle. 🎜🎜🎜🎜Si newState.sleepFor est très petit, des problèmes surviendront, vous devez donc utiliser le laxité. Une fois que la valeur minimale est inférieure au laxité, utilisez le laxité pour maintenir le temps de récupération de l'eau. 🎜🎜🎜🎜Si newState.sleepFor > 0, mettez directement à jour la dernière heure d'exécution dans la structure newState.last = newState.last.Add(newState.sleepFor) et Enregistrez le temps d'attente intervalle, newState.sleepFor = newState.sleepFor, 0. 🎜🎜🎜🎜Si les opérations de récupération d'eau et d'attente sont autorisées, cela signifie qu'aucune concurrence simultanée ne se produit et que le temps de sommeil est simulé t.clock.Sleep(interval). Ensuite, l'heure cible pour la récupération de l'eau est renvoyée et le code du serveur détermine s'il faut renvoyer une réponse ou attendre cette heure pour continuer à répondre. 🎜🎜🎜

    🎜t.clock.Sleep(interval)🎜

     func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
        if clock == nil {
           clock = realClock{}
        }
         // 填充速率
        if fillInterval <= 0 {
           panic("token bucket fill interval is not > 0")
        }
         // 最大令牌容量
        if capacity <= 0 {
           panic("token bucket capacity is not > 0")
        }
         // 单次令牌生成量
        if quantum <= 0 {
           panic("token bucket quantum is not > 0")
        }
        return &Bucket{
           clock:           clock,
           startTime:       clock.Now(),
           latestTick:      0,
           fillInterval:    fillInterval,
           capacity:        capacity,
           quantum:         quantum,
           availableTokens: capacity,
        }
     }
    🎜En fait, lorsqu'une demande arrive, le limiteur de courant se mettra en veille pendant le temps correspondant, et renvoyer la dernière heure de récupération d'eau après avoir dormi. 🎜

    🎜Application pratique (utilisant le framework Gin)🎜🎜
     // TakeAvailable takes up to count immediately available tokens from the
     // bucket. It returns the number of tokens removed, or zero if there are
     // no available tokens. It does not block.
     func (tb *Bucket) TakeAvailable(count int64) int64 {
        tb.mu.Lock()
        defer tb.mu.Unlock()
        return tb.takeAvailable(tb.clock.Now(), count)
     }
    🎜Ici, vous pouvez choisir de revenir ou non. Parce que Take exécutera définitivement la fonction sleep, lorsque l'exécution de take se terminera, cela signifie que la requête actuelle a été reçue. La démonstration actuelle utilise le premier cas. 🎜
      🎜🎜Si votre entreprise nécessite une réponse, l'attente n'est pas autorisée. Vous pourrez ensuite finaliser la demande après avoir reçu l’eau, comme dans l’exemple ci-dessus. 🎜🎜🎜🎜Si votre entreprise autorise l'attente de réponse, alors la demande attendra l'heure de réception de l'eau correspondante avant de passer à l'étape suivante. Le code spécifique consiste à ignorer directement le contenu de if. (recommandé)🎜

测试代码

这里定义了一个响应函数和一个handler函数方便测试

 func response(c *gin.Context, code int, info any) {
    c.JSON(code, info)
 }
 
 func pingHandler(c *gin.Context) {
    response(c, 200, "ping ok~")
 }

执行go test -run=Run -v先开启一个web服务

 func TestRun(t *testing.T) {
    r := gin.Default()
 
    r.GET("/ping1", ratelimit1(), pingHandler)
    r.GET("/ping2", ratelimit2(), helloHandler)
 
    _ = r.Run(":4399")
 }

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

    Usage: go-wrk <options> <url>
    Options:
     -H       Header to add to each request (you can define multiple -H flags) (Default )
     -M       HTTP method (Default GET)
     -T       Socket/request timeout in ms (Default 1000)
     -body    request body string or @filename (Default )
     -c       Number of goroutines to use (concurrent connections) (Default 10)
     -ca      CA file to verify peer against (SSL/TLS) (Default )
     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
     -d       Duration of test in seconds (Default 10)
     -f       Playback file name (Default <empty>)
     -help    Print help (Default false)
     -host    Host Header (Default )
     -http    Use HTTP/2 (Default true)
     -key     Private key file name (SSL/TLS (Default )
     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
     -no-vr   Skip verifying SSL certificate of the server (Default false)
     -redir   Allow Redirects (Default false)
     -v       Print version details (Default false)

-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒(压测速度过快)。

Un article pour parler des bibliothèques de buckets à fuites et de buckets de jetons limitant le courant dans le langage Go可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the
 // rate of one token every fillInterval, up to the given
 // maximum capacity. Both arguments must be
 // positive. The bucket is initially full.
 func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
    return NewBucketWithClock(fillInterval, capacity, nil)
 }
 
 // NewBucketWithClock is identical to NewBucket but injects a testable clock
 // interface.
 func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
 }

进行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
    if clock == nil {
       clock = realClock{}
    }
     // 填充速率
    if fillInterval <= 0 {
       panic("token bucket fill interval is not > 0")
    }
     // 最大令牌容量
    if capacity <= 0 {
       panic("token bucket capacity is not > 0")
    }
     // 单次令牌生成量
    if quantum <= 0 {
       panic("token bucket quantum is not > 0")
    }
    return &Bucket{
       clock:           clock,
       startTime:       clock.Now(),
       latestTick:      0,
       fillInterval:    fillInterval,
       capacity:        capacity,
       quantum:         quantum,
       availableTokens: capacity,
    }
 }

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

 // TakeAvailable takes up to count immediately available tokens from the
 // bucket. It returns the number of tokens removed, or zero if there are
 // no available tokens. It does not block.
 func (tb *Bucket) TakeAvailable(count int64) int64 {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.takeAvailable(tb.clock.Now(), count)
 }

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
    // 如果需要取出的令牌数小于等于零,那么就返回0个令牌
     if count <= 0 {
       return 0
    }
     // 根据时间对当前桶中令牌数进行计算
    tb.adjustavailableTokens(tb.currentTick(now))
     // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
    if tb.availableTokens <= 0 {
       return 0
    }
     // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
    if count > tb.availableTokens {
       count = tb.availableTokens
    }
     // 调整令牌数
    tb.availableTokens -= count
    return count
 }
  • 如果需要取出的令牌数小于等于零,那么就返回0个令牌

  • 根据时间对当前桶中令牌数进行计算

  • 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌

  • 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数

  • 调整令牌数

调整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
     // 如果当前令牌数大于最大等于容量,直接返回最大容量
    if tb.availableTokens >= tb.capacity {
       return
    }
     // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
    tb.availableTokens += (tick - lastTick) * tb.quantum
     // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
    if tb.availableTokens > tb.capacity {
       tb.availableTokens = tb.capacity
    }
    return
 }
  • 如果当前令牌数大于最大等于容量,直接返回最大容量

  • 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)

  • 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数

实现原理

  • 加锁 defer 解锁

  • 判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0

  • 调用函数adjustTokens 获取可用的令牌数量

  • 如果当前可以取出的令牌数小于等于0 直接返回 0

  • 如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数

  • 当前的令牌数 -= 取出的令牌数 (count)

  • 返回 count(可以取出的令牌数)

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

 func ratelimit2() func(ctx *gin.Context) {
     // 生成速率 最大容量
     r2 := rate2.NewBucket(time.Second, 200)
     return func(ctx *gin.Context) {
         //r2.Take() // 允许欠账,令牌不够也可以接收请求
         if r2.TakeAvailable(1) == 1 {
             // 如果想要取出1个令牌并且能够取出,就放行
             ctx.Next()
             return
         }
         response(ctx, http.StatusRequestTimeout, "rate2 limit...")
         ctx.Abort()
         return
     }
 }

Un article pour parler des bibliothèques de buckets à fuites et de buckets de jetons limitant le courant dans le langage Go压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。

【Recommandations associées : Tutoriel vidéo Go, Enseignement de la programmation

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer