Maison >Java >javaDidacticiel >Quels sont les algorithmes de limitation de courant courants en Java ?

Quels sont les algorithmes de limitation de courant courants en Java ?

PHPz
PHPzavant
2023-05-12 18:37:13903parcourir

01 Fenêtre fixe

La fenêtre fixe, également connue sous le nom d'algorithme de limitation de courant à fenêtre fixe (également connue sous le nom d'algorithme de compteur, fenêtre fixe), est l'algorithme de limitation de courant le plus simple. Elle contrôle l'accès maximum dans l'unité de temps grâce à un compteur maintenu à l'intérieur. l'unité de temps.

En supposant que le nombre de requêtes par minute soit limité à 60 maximum, paramétrez un compteur Lorsque la requête arrive, si le compteur atteint le seuil, la requête sera rejetée, sinon le compteur sera incrémenté de 1 ; Le compteur sera remis à 0 toutes les minutes. Le code est implémenté comme suit :

public class CounterRateLimiter extends MyRateLimiter {
    /**
     * 每秒限制请求数
     */
    private final long permitsPerSecond;
    /**
     * 上一个窗口的开始时间
     */
    public long timestamp = System.currentTimeMillis();
    /**
     * 计数器
     */
    private int counter;

    public CounterRateLimiter(long permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 窗口内请求数量小于阈值,更新计数放行,否则拒绝请求
        if (now - timestamp < 1000) {
            if (counter < permitsPerSecond) {
                counter++;
                return true;
            } else {
                return false;
            }
        }
        // 时间窗口过期,重置计数器和时间戳
        counter = 0;
        timestamp = now;
        return true;
    }
}

Le plus grand avantage de la fenêtre fixe est qu'elle est facile à implémenter ; et l'empreinte mémoire est faible, il suffit de stocker le décompte dans la fenêtre de temps, cela peut garantir que plus de données sont disponibles ; les demandes sont traitées sans accumulation d'anciennes demandes. Provoque l'épuisement des nouvelles demandes. Bien entendu, il est également confronté à des problèmes critiques lorsque deux fenêtres se rencontrent, le trafic instantané peut être de 2n.

02 Fenêtre coulissante

Afin d'éviter le trafic instantané, la fenêtre fixe peut être divisée en plusieurs grilles et reculée d'une petite grille à chaque fois au lieu de fixer la taille de la fenêtre.

Par exemple, chaque minute peut être divisée en 6 cellules de 10 secondes. Un compteur est maintenu dans chaque cellule et la fenêtre avance d'une cellule à la fois. Chaque fois qu'une requête arrive, elle peut être libérée tant que la somme des comptes de toutes les cellules de la fenêtre ne dépasse pas le seuil. La transmission de paquets de données dans le protocole TCP utilise également des fenêtres glissantes pour le contrôle de flux.

L'implémentation est la suivante :

public class SlidingWindowRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private final long permitsPerMinute;
    /**
     * 计数器, k-为当前窗口的开始时间值秒,value为当前窗口的计数
     */
    private final TreeMap<Long, Integer> counters;

    public SlidingWindowRateLimiter(long permitsPerMinute) {
        this.permitsPerMinute = permitsPerMinute;
        this.counters = new TreeMap<>();
    }

    @Override
    public synchronized boolean tryAcquire() {
        // 获取当前时间的所在的子窗口值; 10s一个窗口
        long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / 10 * 10;
        // 获取当前窗口的请求总量
        int currentWindowCount = getCurrentWindowCount(currentWindowTime);
        if (currentWindowCount >= permitsPerMinute) {
            return false;
        }
        // 计数器 + 1
        counters.merge(currentWindowTime, 1, Integer::sum);
        return true;
    }
    /**
     * 获取当前窗口中的所有请求数(并删除所有无效的子窗口计数器)
     *
     * @param currentWindowTime 当前子窗口时间
     * @return 当前窗口中的计数
     */
    private int getCurrentWindowCount(long currentWindowTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentWindowTime - 50;
        int result = 0;

        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, Integer> entry = iterator.next();
            if (entry.getKey() < startTime) {
                iterator.remove();
            } else {
                result += entry.getValue();
            }
        }
        return result;
    }
}

La fenêtre glissante résout le problème des pics de trafic instantanés dans le compteur. En fait, l'algorithme du compteur est aussi une sorte de fenêtre glissante, mais la fenêtre n'est pas divisée en unités plus fines. . En comparant les compteurs, on constate que lorsque la granularité de division des fenêtres est plus fine, le contrôle de flux est plus précis et plus strict.

Cependant, lorsque le trafic dans la fenêtre atteint le seuil, le trafic sera coupé instantanément. Dans les applications réelles, l'effet de limitation de flux que nous souhaitons n'est souvent pas de couper le trafic d'un seul coup, mais de permettre au trafic de circuler. entrez dans le système en douceur.

03 Algorithme de seau qui fuit

Comment limiter le courant plus facilement ? Jetons un coup d'œil à l'algorithme Leaky Bucket. Les requêtes sont injectées dans le seau qui fuit à n'importe quelle vitesse comme l'eau, et le seau perdra de l'eau à un débit fixe lorsque la vitesse d'injection continue d'être supérieure à la vitesse de fuite, le seau qui fuit ; deviendra plein, à ce moment-là, les nouvelles demandes entrantes seront rejetées. La limitation et la mise en forme du courant sont les deux capacités principales de l'algorithme du seau à fuite.

L'implémentation est la suivante :

public class LeakyBucketRateLimiter extends MyRateLimiter {
    // 桶的容量
    private final int capacity;
    // 漏出速率
    private final int permitsPerSecond;
    // 剩余水量
    private long leftWater;
    // 上次注入时间
    private long timeStamp = System.currentTimeMillis();

    public LeakyBucketRateLimiter(int permitsPerSecond, int capacity) {
        this.capacity = capacity;
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        //1. 计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp) / 1000;
        leftWater = Math.max(0, leftWater - timeGap * permitsPerSecond);
        timeStamp = now;
        
        // 如果未满,则放行;否则限流
        if (leftWater < capacity) {
            leftWater += 1;
            return true;
        }
        return false;
    }
}

Il ne s'agit pas d'une implémentation complète de l'algorithme du bucket qui fuit. Le code ci-dessus vérifie uniquement si le trafic sera abandonné, c'est-à-dire que tryAcquire renvoie true pour indiquer que le bucket qui fuit n'est pas plein. , sinon cela indique que le compartiment qui fuit est plein. Supprimez la demande.

Si vous souhaitez fuir du trafic à un rythme constant, vous devez généralement l'implémenter avec une file d'attente FIFO. Lorsque tryAcquire renvoie vrai, la requête est mise en file d'attente, puis retirée de la file d'attente à une fréquence fixe pour être traitée. L'exemple de code est le suivant :

@Test
public void testLeakyBucketRateLimiter() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    ExecutorService singleThread = Executors.newSingleThreadExecutor();

    LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(20, 20);
    // 存储流量的队列
    Queue<Integer> queue = new LinkedList<>();
    // 模拟请求  不确定速率注水
    singleThread.execute(() -> {
        int count = 0;
        while (true) {
            count++;
            boolean flag = rateLimiter.tryAcquire();
            if (flag) {
                queue.offer(count);
                System.out.println(count + "--------流量被放行--------");
            } else {
                System.out.println(count + "流量被限制");
            }
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
  
    // 模拟处理请求 固定速率漏水
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        if (!queue.isEmpty()) {
            System.out.println(queue.poll() + "被处理");
        }
    }, 0, 100, TimeUnit.MILLISECONDS);

    // 保证主线程不会退出
    while (true) {
        Thread.sleep(10000);
    }
}

Le but de l'algorithme de compartiment à fuites est principalement de lisser le trafic en rafale et de fournir un mécanisme permettant de garantir que le trafic en rafale dans le réseau est intégré dans un trafic fluide et stable.

Cependant, étant donné que les seaux qui fuient contrôlent trop strictement le trafic, les ressources du système ne peuvent pas être pleinement utilisées dans certains scénarios, car le taux de fuite des seaux qui fuient est fixe. Même si l'aval peut gérer un trafic plus important à un certain moment, les seaux qui fuient ne le seront pas. capable d'utiliser pleinement les ressources du système.

04 Token Bucket

Comment autoriser le trafic en rafale tout en limitant le débit de trafic ? Découvrez l’algorithme du token bucket ! L'algorithme du compartiment à jetons envoie des jetons au compartiment à jetons à un rythme constant. Lorsque la demande arrive, il essaie d'obtenir le jeton du compartiment à jetons. Ce n'est que lorsque le jeton est obtenu qu'il peut être libéré, sinon il sera rejeté.

Le compartiment de jetons a les caractéristiques suivantes :

1. Les jetons sont émis à un taux constant En supposant que le taux limite actuel est de v/s, cela signifie qu'un jeton est émis toutes les 1/v secondes

2. capacité du seau de jetons Oui b, si le seau de jetons est plein, le nouveau jeton sera rejeté

3. Le principe selon lequel la demande peut dépasser le limiteur actuel est qu'il y a des jetons dans le seau de jetons

Les paramètres qui valent la peine d'être payés. L'attention portée à l'algorithme du compartiment à jetons est : Deux, à savoir le taux de limitation actuel v/s et la capacité du compartiment à jetons b. Le taux a représente le taux limite actuel du limiteur de courant dans des circonstances normales, et b est l'abréviation de burst ; , indiquant le trafic en rafale maximum autorisé par le limiteur actuel.

Par exemple, b=10, lorsque le seau de jetons est plein, il y a 10 jetons disponibles. À ce moment, 10 requêtes sont autorisées à passer par le limiteur de courant en même temps (un certain degré de rafale de trafic est autorisé). , et ces 10 requêtes consomment tous les jetons instantanément. Une fois la carte transmise, le trafic ultérieur ne peut traverser le limiteur de flux qu'au débit r.

L'implémentation est la suivante :

public class TokenBucketRateLimiter extends MyRateLimiter {
    /**
     * 令牌桶的容量「限流器允许的最大突发流量」
     */
    private final long capacity;
    /**
     * 令牌发放速率
     */
    private final long generatedPerSeconds;
    /**
     * 最后一个令牌发放的时间
     */
    long lastTokenTime = System.currentTimeMillis();
    /**
     * 当前令牌数量
     */
    private long currentTokens;

    public TokenBucketRateLimiter(long generatedPerSeconds, int capacity) {
        this.generatedPerSeconds = generatedPerSeconds;
        this.capacity = capacity;
    }

    /**
     * 尝试获取令牌
     *
     * @return true表示获取到令牌,放行;否则为限流
     */
    @Override
    public synchronized boolean tryAcquire() {
          /**
           * 计算令牌当前数量
           * 请求时间在最后令牌是产生时间相差大于等于额1s(为啥时1s?因为生成令牌的最小时间单位时s),则
           * 1. 重新计算令牌桶中的令牌数
           * 2. 将最后一个令牌发放时间重置为当前时间
           */
        long now = System.currentTimeMillis();
        if (now - lastTokenTime >= 1000) {
            long newPermits = (now - lastTokenTime) / 1000 * generatedPerSeconds;
            currentTokens = Math.min(currentTokens + newPermits, capacity);
            lastTokenTime = now;
        }
        if (currentTokens > 0) {
            currentTokens--;
            return true;
        }
        return false;
    }
}

Ce qu'il faut prendre en compte, c'est que l'implémentation à laquelle il est très facile de penser est le modèle producteur-consommateur : utilisez un thread producteur pour ajouter régulièrement des jetons à la file d'attente de blocage, et le thread essayant de passer le limiteur de courant car Le thread consommateur n'est autorisé à passer le limiteur de courant qu'après avoir obtenu le jeton de la file d'attente de blocage.

En raison de l'incertitude de la planification des threads, l'erreur du minuteur est très importante dans les scénarios à forte concurrence. Dans le même temps, le minuteur lui-même créera des threads de planification, ce qui affectera également les performances du système.

05 Journal coulissant

Le journal coulissant est un algorithme de limitation de courant relativement "impopulaire" mais très utile. L'algorithme de limitation du débit de journalisation glissante doit enregistrer l'horodatage de la demande, qui est généralement stocké à l'aide d'une collection ordonnée. Nous pouvons suivre toutes les demandes de l'utilisateur au cours d'une période donnée dans une seule collection ordonnée.

假设我们要限制给定T时间内的请求不超过N,我们只需要存储最近T时间之内的请求日志,每当请求到来时判断最近T时间内的请求总数是否超过阈值。

实现如下:

public class SlidingLogRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private static final long PERMITS_PER_MINUTE = 60;
    /**
     * 请求日志计数器, k-为请求的时间(秒),value当前时间的请求数量
     */
    private final TreeMap<Long, Integer> requestLogCountMap = new TreeMap<>();

    @Override
    public synchronized boolean tryAcquire() {
        // 最小时间粒度为s
        long currentTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
        // 获取当前窗口的请求总数
        int currentWindowCount = getCurrentWindowCount(currentTimestamp);
        if (currentWindowCount >= PERMITS_PER_MINUTE) {
            return false;
        }
        // 请求成功,将当前请求日志加入到日志中
        requestLogCountMap.merge(currentTimestamp, 1, Integer::sum);
        return true;
    }

    /**
     * 统计当前时间窗口内的请求数
     *
     * @param currentTime 当前时间
     * @return -
     */
    private int getCurrentWindowCount(long currentTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentTime - 59;
        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        return requestLogCountMap.entrySet()
                .stream()
                .filter(entry -> entry.getKey() >= startTime)
                .mapToInt(Map.Entry::getValue)
                .sum();
    }
}

滑动日志能够避免突发流量,实现较为精准的限流;同样更加灵活,能够支持更加复杂的限流策略,如多级限流,每分钟不超过100次,每小时不超过300次,每天不超过1000次,我们只需要保存最近24小时所有的请求日志即可实现。

灵活并不是没有代价的,带来的缺点就是占用存储空间要高于其他限流算法。

06分布式限流

以上几种限流算法的实现都仅适合单机限流。虽然给每台机器平均分配限流配额可以达到限流的目的,但是由于机器性能,流量分布不均以及计算数量动态变化等问题,单机限流在分布式场景中的效果总是差强人意。

分布式限流最简单的实现就是利用中心化存储,即将单机限流存储在本地的数据存储到同一个存储空间中,如常见的Redis等。

当然也可以从上层流量入口进行限流,Nginx代理服务就提供了限流模块,同样能够实现高性能,精准的限流,其底层是漏桶算法。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer