ホームページ >バックエンド開発 >Golang >送信レート制限の制御

送信レート制限の制御

WBOY
WBOYオリジナル
2024-07-29 08:03:13948ブラウズ

サードパーティ API と対話する分散アプリケーションがあるシナリオを想像してみましょう。通常、サードパーティ API には、クライアントがリクエストをバーストさせてサービスのダウンタイムを引き起こすことを避けるために、レート制限制御メカニズムが備わっています。このようなシナリオでは、呼び出し元は分散環境でサードパーティ API への送信リクエストの速度をどのように制御できるでしょうか?この投稿では、この問題に対する考えられる戦略について説明します。

リクエストのレートを制御するアルゴリズムは複数ありますが、理解と実装が比較的簡単なため、ここではトークン バケット アルゴリズムに焦点を当てます。このアルゴリズムは、バケットは最大 T 個のトークンを保持でき、アプリケーションがサードパーティ API にリクエストを行う場合は、1 を必要とすることを示しています。バケットからのトークン。バケットが空の場合は、バケット内に少なくとも 1 個のトークンが存在するまで待機する必要があります。また、バケットには R トークン/ミリ秒の固定レートで 1 トークンが補充されます。

トークン バケット アルゴリズムは非常に簡単に理解できますが、分散環境でこれを使用してサードパーティ API への送信リクエストを制御するにはどうすればよいでしょうか?

分散環境で送信レート制限を制御したい場合は、現在のレート制限に関する信頼できる一元的な情報源が必要です。真実の情報源を実装するには複数の方法がありますが、私は考えられる実装を示して次の図を理想化しました。

Controlling outgoing rate limit

上の図では、複数のポッドに分散アプリケーションがあり、各ポッドはサードパーティ API にリクエストを行うことができます。アプリケーション インフラストラクチャには、トークン バケット アルゴリズムを使用してレート制限を制御する TCP サーバー があります。サードパーティ API にリクエストを行う前に、ポッドは TCP サーバーに新しいトークンを要求し、使用可能なトークンが少なくとも 1 つ存在するまで、TCP サーバーからの応答を待ちます。トークンが利用可能になった後、ポッドはサードパーティ API にリクエストを作成します。

TCP サーバーの実装は、このリポジトリ https://github.com/rafaquelhodev/rlimit/ にあります。次のセクションでは、golang でのトークン バケットの実装について簡単に説明します。

トークンバケットの実装

以下に、トークン バケットの実装の背後にある主なアイデアを示します。詳細な実装を理解するには、https://github.com/rafaquelhodev/rlimit/ リポジトリをご覧ください。

レート制限制御は TokenBucket 構造体に集中されています:

type TokenBucket struct {
    id           string
    mu           sync.Mutex
    tokens       int64
    maxTokens    int64
    refillPeriod int64
    cron         chan bool
    subs         []chan bool
}

TokenBucket 構造体に subs プロパティがあることがわかります。基本的に、これは特定のトークン バケットのサブスクライバーの配列です。クライアントからトークンがリクエストされるたびに、クライアントは subs 配列に追加され、新しいトークンがバケットに追加されるとクライアントに通知されます。

バケットを開始するとき、バケットがサポートできるトークンの最大数 (maxTokens) と、トークンがバケットに追加される時間 (refillPeriod) を指定する必要があります。

func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
    bucket := &TokenBucket{
        id:           id,
        tokens:       0,
        maxTokens:    maxTokens,
        refillPeriod: refillPeriod,
        cron:         make(chan bool),
        subs:         make([]chan bool, 0),
    }
    fmt.Printf("refill period  = %d\n", refillPeriod)
    bucket.startCron()
    return bucket
}

ここで、「トークンはどのようにしてバケットに追加されるのか?」と疑問に思うかもしれません。そのため、バケットの作成時に cron ジョブが開始され、refillPeriod ミリ秒ごとに新しいトークンがバケットに追加されます。

func (tb *TokenBucket) startCron() {
    ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)

    go func() {
        for {
            select {
            case <-tb.cron:
                ticker.Stop()
                return
            case <-ticker.C:
                if tb.tokens < tb.maxTokens {
                    tb.tokens += 1
                    fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens)

                    if len(tb.subs) > 0 {
                        sub := tb.subs[0]
                        tb.subs = tb.subs[1:]
                        sub <- true
                    }
                }
            }
        }
    }()
}

最後に、クライアントがバケットからトークンを取得したい場合は、waitAvailable 関数を呼び出す必要があります。

func (tb *TokenBucket) waitAvailable() bool {
    tb.mu.Lock()

    if tb.tokens > 0 {
        fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
        tb.tokens -= 1
        tb.mu.Unlock()
        return true
    }

    fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)

    ch := tb.tokenSubscribe()

    tb.mu.Unlock()

    <-ch

    fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id)

    tb.tokens -= 1

    return true
}

https://github.com/Mohamed-khattab/Token-bucket-rate-limiter からインスピレーションを得た

以上が送信レート制限の制御の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。