Maison  >  Article  >  Java  >  Comment implémenter des algorithmes courants de limitation de courant en Java

Comment implémenter des algorithmes courants de limitation de courant en Java

WBOY
WBOYavant
2023-05-11 22:01:111225parcourir

Pourquoi limiter le flux

Augmentez le nombre de personnes entrant autant que possible tout en garantissant la disponibilité. Le reste des personnes fait la queue, ou renvoie des invites amicales pour s'assurer que les utilisateurs du. Le système à l'intérieur peut l'utiliser normalement pour empêcher l'avalanche du système.

Algorithme de limitation de courant

Il existe de nombreux algorithmes de limitation de courant, et il existe trois catégories courantes, à savoir l'algorithme de compteur, l'algorithme de compartiment à fuites et l'algorithme de compartiment à jetons.

(1) Compteur :

Dans un délai donné, le nombre maximum de demandes à traiter est fixé, et le surplus ne sera pas traité.

(2) Leaky bucket :

La taille du bucket qui fuit est fixe et la vitesse de traitement est fixe, mais la vitesse d'entrée des requêtes n'est pas fixe (quand il y a trop de requêtes en cas d'urgence, un trop grand nombre de demandes seront rejetées).

(3) Token Bucket :

La taille du bucket de jetons est fixe, la vitesse de génération de jetons est fixe, mais la vitesse de consommation de jetons (c'est-à-dire de demande) n'est pas fixe (peut faire face à des situations où il y a trop de demandes à certains moments) ; chaque demande prendra un jeton du seau de jetons, et s'il n'y a pas de jeton, la demande sera rejetée.

Limite à contre-courant

Dans un délai donné, le nombre maximum de demandes à traiter est fixé, et le surplus ne sera pas traité.

Par exemple, nous stipulons que pour l'interface A, on ne peut pas accéder plus de 100 fois en 1 minute.

Ensuite, nous pouvons faire ceci :

Au début, nous pouvons définir un compteur compteur Chaque fois qu'une demande arrive, le compteur augmentera de 1. Si compteur La valeur est. supérieur à 100 et l'intervalle entre cette demande et la première demande est toujours inférieur à 1 minute, alors cela signifie qu'il y a trop de demandes et l'accès est refusé

Si l'intervalle entre cette demande et la première ; demande Si le temps est supérieur à 1 minute et que la valeur du compteur est toujours dans la plage limite actuelle, réinitialisez le compteur. C'est aussi simple et grossier que cela.

Comment implémenter des algorithmes courants de limitation de courant en Java

Implémentation du code :

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//计数器 限流
public class CounterLimiter {

    //起始时间
    private static long startTime = System.currentTimeMillis();

    //时间间隔1000ms
    private static long interval = 1000;

    //每个时间间隔内,限制数量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判断是否在上一个时间间隔内
        if (nowTime < startTime + interval) {
            //如果还在上个时间间隔内
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一个时间间隔内
            synchronized (CounterLimiter.class) {
                //防止重复初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次进行判断
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

Les défauts de la limitation à contre-courant :

Bien que cet algorithme C'est simple, mais il y a un problème critique. Regardons l'image ci-dessous :

Comment implémenter des algorithmes courants de limitation de courant en Java

De l'image ci-dessus, nous pouvons voir qu'en supposant qu'il y ait un malware. utilisateur, il est à 0 : à 59h00, 100 requêtes ont été envoyées instantanément, et à 1h00, 100 requêtes ont été envoyées instantanément. En fait, cet utilisateur a envoyé 200 requêtes instantanément en 1 seconde.

Ce que nous venons de stipuler est un maximum de 100 requêtes par minute (débit prévu), soit un maximum de 1,7 requêtes par seconde. Les utilisateurs effectuent des requêtes en rafale au nœud de réinitialisation de la fenêtre horaire, notre. les limites de taux peuvent être dépassées instantanément.

Les utilisateurs peuvent instantanément écraser notre application grâce à cette faille dans l'algorithme.

Limitation actuelle du seau qui fuit

Le principe de base de la limite de courant de l'algorithme du seau qui fuit est le suivant : l'eau (correspondant à la demande) pénètre dans le seau qui fuit par l'entrée d'eau, et la fuite le seau se déplace à une certaine vitesse Décharge d'eau (libération de la demande), lorsque la vitesse d'entrée de l'eau est trop élevée, le volume total d'eau dans le seau est supérieur à la capacité du seau, il débordera directement et la demande est rejetée.

Les règles approximatives de limitation du courant du seau qui fuit sont les suivantes :

(1) L'arrivée d'eau (correspondant à la demande du client) s'écoule dans tous les cas dans le seau qui fuit.

(2) La capacité du seau qui fuit est fixe, et le débit de sortie (libération) de l'eau est également fixe.

(3) La capacité du seau qui fuit reste inchangée. Si la vitesse de traitement est trop lente, la quantité d'eau dans le seau dépassera la capacité du seau et les gouttelettes d'eau y afflueront plus tard. débordera, indiquant que la demande est rejetée.

Comment implémenter des algorithmes courants de limitation de courant en Java

⭐L'algorithme du seau qui fuit est en fait très simple. Il peut être grossièrement considéré comme le processus de fuite d'eau de toute façon. et s'écoule à un certain débit. L'eau est rejetée lorsqu'elle dépasse la capacité du seau, car la capacité du seau reste inchangée, garantissant ainsi la vitesse globale.

L'eau s'écoule à un certain rythme,

Comment implémenter des algorithmes courants de limitation de courant en Java

Coupure de pointe : lorsqu'une grande quantité de trafic entre, un débordement se produit , limitant ainsi le flux Le service de protection est disponible

Buffering : Il ne demandera pas directement au serveur, pression de mise en mémoire tampon

Implémentation du code :

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒两个
    private static long rate = 2;
    //开始时间
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量问0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //计算从当前时间到开始时间流出的水,和现在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出现<0的情况
        water.set(Math.max(0, water.get()));
        //设置新的开始时间
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果当前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

Les défauts du seau qui fuit : #🎜🎜 #

La vitesse d'évacuation de l'eau du seau qui fuit est fixe, c'est-à-dire que la vitesse de libération demandée est fixe.

La vitesse de sortie du godet qui fuit est fixe et ne peut pas faire face de manière flexible à l'amélioration des capacités back-end. Par exemple, grâce à l'expansion dynamique, le trafic back-end passe de 1 000 QPS à 1 WQPS, et il n'existe aucune solution pour les compartiments qui fuient.

Limite actuelle du bucket de jetons

Dans l'algorithme du bucket de jetons, lorsqu'une nouvelle demande arrive, un jeton sera extrait du bucket s'il n'y a pas de jeton disponible dans le bucket. , Déni de service. Bien entendu, le nombre de jetons est également limité. Le nombre de jetons est fortement lié au temps et au taux d'émission. Plus le temps passe, plus de jetons seront ajoutés au compartiment. Si la vitesse d'émission des jetons est plus rapide que la vitesse d'application, le compartiment de jetons sera rempli de jetons. jusqu'à ce que les jetons occupent tout le compartiment de jetons.

Les règles générales pour la limitation du courant du seau à jetons sont les suivantes :

(1) L'entrée d'eau met les jetons dans le seau à une certaine vitesse.

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。

(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

Comment implémenter des algorithmes courants de limitation de courant en Java

代码实现: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2个
    private static long rate = 2;
    //上次放置令牌的时间
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的数量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新时间
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中还有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

令牌桶的好处: 

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer