>백엔드 개발 >Golang >Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사

Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사

青灯夜游
青灯夜游앞으로
2023-02-03 19:00:223392검색

이 기사에서는 Go 언어의 현재 제한 누출 버킷 및 토큰 버킷 라이브러리에 대해 설명하고 토큰 버킷 및 누출 버킷의 구현 원리와 실제 프로젝트에서의 간단한 적용을 소개합니다.

Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사

전류 제한 미들웨어가 필요한 이유는 무엇입니까?

많은 양의 데이터에 동시에 액세스하는 경우 서비스나 인터페이스가 많은 요청에 직면하여 데이터베이스가 충돌하거나 전체 시스템이 붕괴되는 연쇄 반응을 일으키는 경우가 종종 있습니다. 또는 누군가 악의적으로 웹사이트를 공격하여 쓸모없는 요청이 많아 캐시 침투가 발생할 수 있습니다. 전류 제한 미들웨어를 사용하면 단시간에 요청 횟수를 제한하고 다운그레이드하는 역할을 하여 웹사이트의 보안을 확보할 수 있습니다.

많은 수의 동시 요청을 처리하기 위한 전략은 무엇입니까?

  • 통합 제한을 위해 메시지 미들웨어 사용(속도 감소)

  • 현재 제한 방식을 사용하여 중복 요청 반환(현재 제한)

  • 서버 업그레이드

  • 캐시(그러나 여전히 캐시 침투가 있음) 위험을 기다리는 중)

  • 대기하는 중

코드를 개선할 수 없을 때 하드웨어 수준을 향상시키는 유일한 방법은 이를 개선하는 것이라고 볼 수 있습니다. 아니면 아키텍처를 변경하고 다른 레이어를 추가하세요! 통합 처리를 위해 메시지 미들웨어를 사용할 수도 있습니다. 결합된 관점에서 볼 때 전류 제한 솔루션은 큰 변경이나 높은 오버헤드가 필요하지 않은 전략입니다.

일반적인 전류 제한 방식

  • 토큰 버킷 알고리즘

  • 누수 버킷 알고리즘

  • 슬라이딩 윈도우 알고리즘

  • etc.

Leaky Bucket

ratelimit 라이브러리 소개

go get -u go.uber.org/ratelimitgo get -u go.uber.org/ratelimit

库函数源代码

 // New returns a Limiter that will limit to the given RPS.
 func New(rate int, opts ...Option) Limiter {
     return newAtomicBased(rate, opts...)
 }
 
 // newAtomicBased returns a new atomic based limiter.
 func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
     // TODO consider moving config building to the implementation
     // independent code.
     config := buildConfig(opts)
     perRequest := config.per / time.Duration(rate)
     l := &atomicLimiter{
         perRequest: perRequest,
         maxSlack:   -1 * time.Duration(config.slack) * perRequest,
         clock:      config.clock,
     }
 
     initialState := state{
         last:     time.Time{},
         sleepFor: 0,
     }
     atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
     return l
 }

该函数使用了函数选项模式多个结构体对象进行初始化

根据传入的值来初始化一个桶结构体 rateint 传参 。

初始化过程中包括了

  • 每一滴水需要的时间 perquest = config.per / time.Duration(rate)
  • maxSlack 宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest 松紧度是用来规范等待时间的
 // Clock is the minimum necessary interface to instantiate a rate limiter with
 // a clock or mock clock, compatible with clocks created using
 // github.com/andres-erbsen/clock.
 type Clock interface {
    Now() time.Time
    Sleep(time.Duration)
 }

同时还需要结构体Clock来记录当前请求的时间now和此刻的请求所需要花费等待的时间sleep

 type state struct {
    last     time.Time
    sleepFor time.Duration
 }

state 主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)

最重要的Take逻辑

 func (t *atomicLimiter) Take() time.Time {
    var (
       newState state
       taken    bool
       interval time.Duration
    )
    for !taken {
       now := t.clock.Now()
 
       previousStatePointer := atomic.LoadPointer(&t.state)
       oldState := (*state)(previousStatePointer)
 
       newState = state{
          last:     now,
          sleepFor: oldState.sleepFor,
       }

       if oldState.last.IsZero() {
          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
          continue
       }
       // 计算是否需要进行等待取水操作
       newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)
        
        // 如果等待取水时间特别小,就需要松紧度进行维护
       if newState.sleepFor < t.maxSlack {
          newState.sleepFor = t.maxSlack
       }
        // 如果等待时间大于0,就进行更新
       if newState.sleepFor > 0 {
          newState.last = newState.last.Add(newState.sleepFor)
          interval, newState.sleepFor = newState.sleepFor, 0
       }
       taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
    }
    t.clock.Sleep(interval)
    // 最后返回需要等待的时间
     return newState.last
 }

实现一个Take方法

  • 该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。

  • 记录下当前的时间 now := t.clock.Now()

  • oldState.last.IsZero()判断是不是第一次取水,如果是就直接将state结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。

  • 如果 newState.sleepFor 非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。

  • 如果newState.sleepFor > 0 就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0

  • 如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。

t.clock.Sleep(interval)

 func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。

实际应用(使用Gin框架)

 func ratelimit1() func(ctx *gin.Context) {
     r1 := rate1.New(100)
     return func(ctx *gin.Context) {
         now := time.Now()
         //  Take 返回的是一个 time.Duration的时间
         if r1.Take().Sub(now) > 0 {
             // 返回的时间比当前的时间还大,说明需要进行等待
             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
             // 如果不需要等待请求时间,就直接进行Abort 然后返回
             response(ctx, http.StatusRequestTimeout, "rate1 limit...")
             fmt.Println("rate1 limit...")
             ctx.Abort()
             return
         }
         // 放行
         ctx.Next()
     }
 }

这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。

  • 如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。

  • 如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将if

    라이브러리 함수 소스 코드🎜

     func response(c *gin.Context, code int, info any) {
        c.JSON(code, info)
     }
     
     func pingHandler(c *gin.Context) {
        response(c, 200, "ping ok~")
     }
    🎜이 함수는 🎜함수 옵션 mode🎜를 사용하여 전달된 값에 따라 버킷 구조를 초기화🎜다중 구조🎜객체🎜🎜합니다. rateint 매개변수입니다. 🎜🎜초기화 프로세스에는 🎜
      🎜물 한 방울에 필요한 시간 perquest = config.per / time.Duration(rate)🎜🎜maxSlack 휴식( 느슨함은 음수 값) -1 * time.Duration(config.slack) * perRequest 느슨함은 대기 시간을 표준화하는 데 사용됩니다🎜🎜
       func TestRun(t *testing.T) {
          r := gin.Default()
       
          r.GET("/ping1", ratelimit1(), pingHandler)
          r.GET("/ping2", ratelimit2(), helloHandler)
       
          _ = r.Run(":4399")
       }
      🎜동시에 구조체 현재 요청의 시간 <code>지금과 이 순간 sleep🎜
          Usage: go-wrk <options> <url>
          Options:
           -H       Header to add to each request (you can define multiple -H flags) (Default )
           -M       HTTP method (Default GET)
           -T       Socket/request timeout in ms (Default 1000)
           -body    request body string or @filename (Default )
           -c       Number of goroutines to use (concurrent connections) (Default 10)
           -ca      CA file to verify peer against (SSL/TLS) (Default )
           -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
           -d       Duration of test in seconds (Default 10)
           -f       Playback file name (Default <empty>)
           -help    Print help (Default false)
           -host    Host Header (Default )
           -http    Use HTTP/2 (Default true)
           -key     Private key file name (SSL/TLS (Default )
           -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
           -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
           -no-vr   Skip verifying SSL certificate of the server (Default false)
           -redir   Allow Redirects (Default false)
           -v       Print version details (Default false)
      🎜state는 주로 실행 시간과 현재 실행 요청을 기다리는 데 걸리는 시간을 기록하는 데 사용됩니다(중간 상태 기록으로)🎜

      🎜The 가장 중요한 Take 논리🎜

       // NewBucket returns a new token bucket that fills at the
       // rate of one token every fillInterval, up to the given
       // maximum capacity. Both arguments must be
       // positive. The bucket is initially full.
       func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
          return NewBucketWithClock(fillInterval, capacity, nil)
       }
       
       // NewBucketWithClock is identical to NewBucket but injects a testable clock
       // interface.
       func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
          return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
       }
      🎜Take 메서드 구현🎜 🎜🎜🎜Take 메서드는 원자성 작업(잠금 및 잠금 해제로 이해될 수 있음)을 수행하며 많은 수의 동시 요청에서도 정상적인 사용을 보장할 수 있습니다. 🎜🎜🎜🎜현재 시간을 기록 now := t.clock.Now()🎜🎜🎜🎜oldState.last.IsZero() 처음인지 확인 물을 얻으려면 state 구조에 값을 직접 반환하세요. 이 구조에서는 마지막 실행 시간이 초기화됩니다. 처음 물을 가져오는 경우에는 현재 시간이 그대로 전달됩니다. 🎜🎜🎜🎜newState.sleepFor가 너무 작으면 문제가 발생하므로 laxity를 사용해야 합니다. 최소값이 laxity보다 작으면 laxity를 사용하여 물 가져오는 시간을 유지합니다. . 🎜🎜🎜🎜newState.sleepFor > 0인 경우 newState.last = newState.last.Add(newState.sleepFor) 구조에서 마지막 실행 시간을 직접 업데이트하고 대기 시간 interval, newState.sleepFor = newState.sleepFor, 0을 기록합니다. 🎜🎜🎜🎜물 가져오기 및 대기 작업이 허용되면 동시성 경쟁이 발생하지 않으며 수면 시간이 t.clock.Sleep(interval)로 시뮬레이션된다는 의미입니다. 그런 다음 물 검색을 위한 목표 시간이 반환되고 서버 코드는 응답을 다시 보낼지 아니면 이 시간 동안 응답이 계속될 때까지 기다릴지 결정합니다. 🎜🎜🎜

      🎜t.clock.Sleep(interval)🎜

       func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
          if clock == nil {
             clock = realClock{}
          }
           // 填充速率
          if fillInterval <= 0 {
             panic("token bucket fill interval is not > 0")
          }
           // 最大令牌容量
          if capacity <= 0 {
             panic("token bucket capacity is not > 0")
          }
           // 单次令牌生成量
          if quantum <= 0 {
             panic("token bucket quantum is not > 0")
          }
          return &Bucket{
             clock:           clock,
             startTime:       clock.Now(),
             latestTick:      0,
             fillInterval:    fillInterval,
             capacity:        capacity,
             quantum:         quantum,
             availableTokens: capacity,
          }
       }
      🎜실제로 요청이 오면 현재 제한기는 해당 시간 동안 절전 모드로 전환되며, 자고 난 후 가장 최근에 물을 채취한 시간을 반환합니다. 🎜

      🎜실용 애플리케이션(Gin 프레임워크 사용)🎜🎜
       // TakeAvailable takes up to count immediately available tokens from the
       // bucket. It returns the number of tokens removed, or zero if there are
       // no available tokens. It does not block.
       func (tb *Bucket) TakeAvailable(count int64) int64 {
          tb.mu.Lock()
          defer tb.mu.Unlock()
          return tb.takeAvailable(tb.clock.Now(), count)
       }
      🎜여기에서 반환 여부를 선택할 수 있습니다. Take는 반드시 sleep 기능을 실행하기 때문에 take의 실행이 종료되면 현재 요청을 받았다는 의미입니다. 현재 데모에서는 첫 번째 사례를 사용합니다. 🎜
        🎜🎜업무상 답변이 필요한 경우에는 기다리시면 안됩니다. 그러면 위 예시처럼 물을 받은 후 요청을 완료할 수 있습니다. 🎜🎜🎜🎜비즈니스에서 응답 대기를 허용하는 경우 요청은 다음 단계로 진행하기 전에 해당 물 수신 시간을 기다립니다. 구체적인 코드는 if의 내용을 직접 무시하는 것입니다. (추천)🎜

测试代码

这里定义了一个响应函数和一个handler函数方便测试

 func response(c *gin.Context, code int, info any) {
    c.JSON(code, info)
 }
 
 func pingHandler(c *gin.Context) {
    response(c, 200, "ping ok~")
 }

执行go test -run=Run -v先开启一个web服务

 func TestRun(t *testing.T) {
    r := gin.Default()
 
    r.GET("/ping1", ratelimit1(), pingHandler)
    r.GET("/ping2", ratelimit2(), helloHandler)
 
    _ = r.Run(":4399")
 }

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

    Usage: go-wrk <options> <url>
    Options:
     -H       Header to add to each request (you can define multiple -H flags) (Default )
     -M       HTTP method (Default GET)
     -T       Socket/request timeout in ms (Default 1000)
     -body    request body string or @filename (Default )
     -c       Number of goroutines to use (concurrent connections) (Default 10)
     -ca      CA file to verify peer against (SSL/TLS) (Default )
     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
     -d       Duration of test in seconds (Default 10)
     -f       Playback file name (Default <empty>)
     -help    Print help (Default false)
     -host    Host Header (Default )
     -http    Use HTTP/2 (Default true)
     -key     Private key file name (SSL/TLS (Default )
     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
     -no-vr   Skip verifying SSL certificate of the server (Default false)
     -redir   Allow Redirects (Default false)
     -v       Print version details (Default false)

-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒(压测速度过快)。

Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the
 // rate of one token every fillInterval, up to the given
 // maximum capacity. Both arguments must be
 // positive. The bucket is initially full.
 func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
    return NewBucketWithClock(fillInterval, capacity, nil)
 }
 
 // NewBucketWithClock is identical to NewBucket but injects a testable clock
 // interface.
 func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
 }

进行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
    if clock == nil {
       clock = realClock{}
    }
     // 填充速率
    if fillInterval <= 0 {
       panic("token bucket fill interval is not > 0")
    }
     // 最大令牌容量
    if capacity <= 0 {
       panic("token bucket capacity is not > 0")
    }
     // 单次令牌生成量
    if quantum <= 0 {
       panic("token bucket quantum is not > 0")
    }
    return &Bucket{
       clock:           clock,
       startTime:       clock.Now(),
       latestTick:      0,
       fillInterval:    fillInterval,
       capacity:        capacity,
       quantum:         quantum,
       availableTokens: capacity,
    }
 }

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

 // TakeAvailable takes up to count immediately available tokens from the
 // bucket. It returns the number of tokens removed, or zero if there are
 // no available tokens. It does not block.
 func (tb *Bucket) TakeAvailable(count int64) int64 {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.takeAvailable(tb.clock.Now(), count)
 }

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
    // 如果需要取出的令牌数小于等于零,那么就返回0个令牌
     if count <= 0 {
       return 0
    }
     // 根据时间对当前桶中令牌数进行计算
    tb.adjustavailableTokens(tb.currentTick(now))
     // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
    if tb.availableTokens <= 0 {
       return 0
    }
     // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
    if count > tb.availableTokens {
       count = tb.availableTokens
    }
     // 调整令牌数
    tb.availableTokens -= count
    return count
 }
  • 如果需要取出的令牌数小于等于零,那么就返回0个令牌

  • 根据时间对当前桶中令牌数进行计算

  • 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌

  • 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数

  • 调整令牌数

调整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
     // 如果当前令牌数大于最大等于容量,直接返回最大容量
    if tb.availableTokens >= tb.capacity {
       return
    }
     // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
    tb.availableTokens += (tick - lastTick) * tb.quantum
     // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
    if tb.availableTokens > tb.capacity {
       tb.availableTokens = tb.capacity
    }
    return
 }
  • 如果当前令牌数大于最大等于容量,直接返回最大容量

  • 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)

  • 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数

实现原理

  • 加锁 defer 解锁

  • 判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0

  • 调用函数adjustTokens 获取可用的令牌数量

  • 如果当前可以取出的令牌数小于等于0 直接返回 0

  • 如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数

  • 当前的令牌数 -= 取出的令牌数 (count)

  • 返回 count(可以取出的令牌数)

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

 func ratelimit2() func(ctx *gin.Context) {
     // 生成速率 最大容量
     r2 := rate2.NewBucket(time.Second, 200)
     return func(ctx *gin.Context) {
         //r2.Take() // 允许欠账,令牌不够也可以接收请求
         if r2.TakeAvailable(1) == 1 {
             // 如果想要取出1个令牌并且能够取出,就放行
             ctx.Next()
             return
         }
         response(ctx, http.StatusRequestTimeout, "rate2 limit...")
         ctx.Abort()
         return
     }
 }

Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。

【관련 추천: Go 비디오 튜토리얼, 프로그래밍 교육

위 내용은 Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제