ホームページ  >  記事  >  Java  >  Java で一般的な電流制限アルゴリズムを実装する方法

Java で一般的な電流制限アルゴリズムを実装する方法

WBOY
WBOY転載
2023-05-11 22:01:111269ブラウズ

なぜフローを制限するのか

可用性を確保しながら、入場する人の数をできる限り増やします。残りの人は列に並んで待っているか、システムのユーザーが確実に利用できるようにフレンドリーなプロンプトを返します。システムの雪崩を防ぎます。

電流制限アルゴリズム

電流制限アルゴリズムには多くのものがあり、カウンタ アルゴリズム、リーキー バケット アルゴリズム、トークン バケット アルゴリズムという 3 つの一般的なカテゴリがあります。

(1) カウンタ:

一定期間内に処理されるリクエストの最大数は固定されており、超過したリクエストは処理されません。

(2) リーキーバケット:

リーキーバケットのサイズと処理速度は固定されていますが、リクエストのエントリ速度は固定されていません(リクエストが多すぎると破棄されます)緊急時のリクエストが多すぎる)。

(3) トークン バケット:

トークン バケットのサイズは固定されており、トークンの生成速度は固定されていますが、トークンの消費 (つまりリクエスト) 速度は固定されていません (一部の Too many time リクエストに対処できます); 各リクエストはトークン バケットからトークンを取得し、トークンがない場合、リクエストは破棄されます。

カウンタ電流制限

一定期間内に処理されるリクエストの最大数は固定されており、超過したリクエストは処理されません。

たとえば、インターフェイス A については、1 分間に 100 回を超えてアクセスできないと規定します。

次に、これを実行できます:

最初に、カウンター counter を設定できます。リクエストが来るたびに、カウンターは 1 ずつ増加します。カウンターの値が 100 より大きい場合、このリクエストと最初のリクエストの間の間隔が 1 分以内である場合は、リクエストが多すぎるためアクセスが拒否されたことを意味します;

このリクエストと最初のリクエストの間の間隔が 1 より大きい場合分、カウンタの値がまだ電流制限範囲内にある場合、カウンタをリセットするという単純かつ粗雑なものです。

Java で一般的な電流制限アルゴリズムを実装する方法

コードの実装:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//计数器 限流
public class CounterLimiter {

    //起始时间
    private static long startTime = System.currentTimeMillis();

    //时间间隔1000ms
    private static long interval = 1000;

    //每个时间间隔内,限制数量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判断是否在上一个时间间隔内
        if (nowTime < startTime + interval) {
            //如果还在上个时间间隔内
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一个时间间隔内
            synchronized (CounterLimiter.class) {
                //防止重复初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次进行判断
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

カウンタ電流制限の欠点:

このアルゴリズムは単純ですが、重大な問題があります。画像:

Java で一般的な電流制限アルゴリズムを実装する方法

上の図から、悪意のあるユーザーがいると仮定すると、0:59 と 1:00 に 100 のリクエストを瞬時に送信していることがわかります。 00 さらに 100 個のリクエストが即座に送信されたため、実際、このユーザーは 1 秒以内に 200 個のリクエストを即座に送信しました。

私たちが今規定したのは、1 分あたり最大 100 リクエスト (計画されたスループット) で、これは 1 秒あたり最大 1.7 リクエストです。ユーザーは、タイム ウィンドウのリセット ノードで即座にバースト リクエストを行うことができます。レート制限。

ユーザーはアルゴリズムのこの抜け穴を利用して、アプリケーションを即座に破壊する可能性があります。

リーキーバケットの電流制限

リーキーバケットアルゴリズムの電流制限の基本原理は次のとおりです。水(要求に対応)が取水口からリーキーバケットに入り、リーキーバケットは水を排出します。一定の速度(解放要求))、水の流入速度が速すぎてバケツ内の総水量がバケツの容量より大きい場合、直接オーバーフローし、要求は拒否されます。

大まかなリーキーバケットの電流制限ルールは次のとおりです。

(1) 水入口(クライアントの要求に対応)は、とにかくリーキーバケットに流れ込みます。

(2) リーキーバケツの容量は決まっており、水の出口(放出)量も決まっています。

(3) 漏れバケツの容量は変化しませんが、処理速度が遅すぎるとバケツ内の水量がバケツの容量を超え、後から流入する水滴があふれてしまうため、リクエストは拒否されます。

Java で一般的な電流制限アルゴリズムを実装する方法

⭐漏れバケツのアルゴリズムは実際には非常に単純です。大まかに言うと、水がバケツに流入し、急激に流出する水漏れのプロセスと考えることができます。水が一定の割合を超えた場合、バケツの容量(容量)は変更されないため、廃棄され、全体の割合が保証されます。

水は一定の速度で流出します。

Java で一般的な電流制限アルゴリズムを実装する方法

ピーククリッピング:大量のトラフィックが流入するとオーバーフローが発生するため、電流制限保護サービス

#バッファリング: サーバーに直接リクエストせず、圧力をバッファリングします

#コード実装:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒两个
    private static long rate = 2;
    //开始时间
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量问0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //计算从当前时间到开始时间流出的水,和现在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出现<0的情况
        water.set(Math.max(0, water.get()));
        //设置新的开始时间
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果当前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

水漏れバケツの欠点:

水の出口リーキーバケットの速度は固定、つまりリクエストのリリース速度は固定です。

リーキーバケットの出口速度が固定されており、バックエンドの能力向上に柔軟に対応できません。たとえば、動的拡張によりバックエンド トラフィックが 1000QPS から 1WQPS に増加しますが、リーキー バケットに対する解決策はありません。

トークン バケットの現在の制限

トークン バケット アルゴリズムでは、新しいリクエストが到着すると、バケットからトークンが取得されます。バケットに利用可能なトークンがない場合、サービスは否定される。もちろん、トークンの数にも制限があります。トークンの数は時間と発行率に強く関係します。時間が経過するほど、より多くのトークンがバケットに追加されます。トークンの発行速度がアプリケーションの速度よりも速い場合、トークン バケットはトークンでいっぱいになります。トークンがトークン バケット全体を占めるまで。

トークン バケツの流れを制限するための一般的なルールは次のとおりです。

(1) 給水口はトークンを一定の速度でバケツに入れます。

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。

(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

Java で一般的な電流制限アルゴリズムを実装する方法

代码实现: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2个
    private static long rate = 2;
    //上次放置令牌的时间
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的数量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新时间
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中还有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

令牌桶的好处: 

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

以上がJava で一般的な電流制限アルゴリズムを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。