Heim  >  Artikel  >  Java  >  Was sind die gängigen Strombegrenzungsalgorithmen in Java?

Was sind die gängigen Strombegrenzungsalgorithmen in Java?

PHPz
PHPznach vorne
2023-05-12 18:37:13866Durchsuche

01 Festes Fenster

Festes Fenster, auch bekannt als Strombegrenzungsalgorithmus mit festem Fenster (auch bekannt als Zähleralgorithmus, festes Fenster), ist der einfachste Strombegrenzungsalgorithmus. Er steuert den maximalen Zugriff innerhalb der Zeiteinheit durch einen darin aufrechterhaltenen Zähler die Zeiteinheit.

Angenommen, die Anzahl der Anfragen pro Minute ist auf nicht mehr als 60 begrenzt. Legen Sie einen Zähler fest. Wenn die Anfrage eintrifft und der Zähler den Schwellenwert erreicht, wird die Anfrage abgelehnt. Andernfalls wird der Zähler um 1 erhöht. Der Zähler wird jede Minute auf 0 zurückgesetzt. Der Code ist wie folgt implementiert:

public class CounterRateLimiter extends MyRateLimiter {
    /**
     * 每秒限制请求数
     */
    private final long permitsPerSecond;
    /**
     * 上一个窗口的开始时间
     */
    public long timestamp = System.currentTimeMillis();
    /**
     * 计数器
     */
    private int counter;

    public CounterRateLimiter(long permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 窗口内请求数量小于阈值,更新计数放行,否则拒绝请求
        if (now - timestamp < 1000) {
            if (counter < permitsPerSecond) {
                counter++;
                return true;
            } else {
                return false;
            }
        }
        // 时间窗口过期,重置计数器和时间戳
        counter = 0;
        timestamp = now;
        return true;
    }
}

Der größte Vorteil des festen Fensters besteht darin, dass es einfach zu implementieren ist und der Speicherbedarf gering ist. Wir müssen nur die Anzahl im Zeitfenster speichern, um sicherzustellen, dass es aktueller ist Anfragen werden verarbeitet, ohne dass sich alte Anfragen ansammeln. Dadurch werden neue Anfragen ausgehungert. Natürlich gibt es auch kritische Probleme, wenn zwei Fenster aufeinander treffen, kann der momentane Datenverkehr 2n betragen.

02 Schiebefenster

Um sofortigen Verkehr zu verhindern, kann das feste Fenster weiter in mehrere Raster unterteilt und jedes Mal um ein kleines Raster nach hinten verschoben werden, anstatt die Fenstergröße festzulegen. Dies ist ein Schiebefenster.

Zum Beispiel kann jede Minute in 6 Zellen zu je 10 Sekunden unterteilt werden. In jeder Zelle wird ein Zähler geführt, und das Fenster wird jeweils eine Zelle nach vorne verschoben. Immer wenn eine Anfrage eintrifft, kann sie freigegeben werden, solange die Summe der Anzahl aller Zellen im Fenster den Schwellenwert nicht überschreitet. Auch die Übertragung von Datenpaketen im TCP-Protokoll nutzt Schiebefenster zur Flusskontrolle.

Die Implementierung ist wie folgt:

public class SlidingWindowRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private final long permitsPerMinute;
    /**
     * 计数器, k-为当前窗口的开始时间值秒,value为当前窗口的计数
     */
    private final TreeMap<Long, Integer> counters;

    public SlidingWindowRateLimiter(long permitsPerMinute) {
        this.permitsPerMinute = permitsPerMinute;
        this.counters = new TreeMap<>();
    }

    @Override
    public synchronized boolean tryAcquire() {
        // 获取当前时间的所在的子窗口值; 10s一个窗口
        long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / 10 * 10;
        // 获取当前窗口的请求总量
        int currentWindowCount = getCurrentWindowCount(currentWindowTime);
        if (currentWindowCount >= permitsPerMinute) {
            return false;
        }
        // 计数器 + 1
        counters.merge(currentWindowTime, 1, Integer::sum);
        return true;
    }
    /**
     * 获取当前窗口中的所有请求数(并删除所有无效的子窗口计数器)
     *
     * @param currentWindowTime 当前子窗口时间
     * @return 当前窗口中的计数
     */
    private int getCurrentWindowCount(long currentWindowTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentWindowTime - 50;
        int result = 0;

        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, Integer> entry = iterator.next();
            if (entry.getKey() < startTime) {
                iterator.remove();
            } else {
                result += entry.getValue();
            }
        }
        return result;
    }
}

Das Schiebefenster löst das Problem der momentanen Verkehrsspitzen im Zähler. Tatsächlich ist der Zähleralgorithmus auch eine Art Schiebefenster, aber das Fenster ist nicht in feinkörnigere Einheiten unterteilt . Beim Vergleich der Zähler ist ersichtlich, dass die Flusskontrolle präziser und strenger ist, wenn die Granularität der Fensteraufteilung feiner ist.

Wenn jedoch der Verkehr im Fenster den Schwellenwert erreicht, wird der Verkehr sofort unterbrochen. In tatsächlichen Anwendungen besteht der gewünschte Flussbegrenzungseffekt oft nicht darin, den Verkehr auf einmal zu unterbrechen, sondern ihn zuzulassen reibungslos in das System einsteigen.

03 Leaky-Bucket-Algorithmus

Wie kann der Strom reibungsloser begrenzt werden? Werfen wir einen Blick auf den Leaky-Bucket-Algorithmus. Anforderungen werden mit einer beliebigen Geschwindigkeit in den Leaky-Bucket eingespritzt wird voll sein, zu diesem Zeitpunkt werden neu eingehende Anfragen verworfen. Strombegrenzung und -formung sind die beiden Kernfunktionen des Leaky-Bucket-Algorithmus.

Die Implementierung lautet wie folgt:

public class LeakyBucketRateLimiter extends MyRateLimiter {
    // 桶的容量
    private final int capacity;
    // 漏出速率
    private final int permitsPerSecond;
    // 剩余水量
    private long leftWater;
    // 上次注入时间
    private long timeStamp = System.currentTimeMillis();

    public LeakyBucketRateLimiter(int permitsPerSecond, int capacity) {
        this.capacity = capacity;
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        //1. 计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp) / 1000;
        leftWater = Math.max(0, leftWater - timeGap * permitsPerSecond);
        timeStamp = now;
        
        // 如果未满,则放行;否则限流
        if (leftWater < capacity) {
            leftWater += 1;
            return true;
        }
        return false;
    }
}

Dies ist keine vollständige Implementierung des Leaky-Bucket-Algorithmus. Der obige Code überprüft nur, ob der Datenverkehr aufgegeben wird, das heißt, tryAcquire gibt true zurück, um anzuzeigen, dass der Leaky-Bucket nicht voll ist Andernfalls wird angezeigt, dass der Leaky-Bucket voll ist. Verwerfen Sie die Anfrage.

Wenn Sie Datenverkehr mit einer konstanten Rate durchsickern lassen möchten, sollten Sie dies normalerweise mit einer FIFO-Warteschlange implementieren. Wenn tryAcquire „true“ zurückgibt, wird die Anforderung in die Warteschlange gestellt und dann mit einer festen Häufigkeit zur Verarbeitung aus der Warteschlange genommen. Der Beispielcode lautet wie folgt:

@Test
public void testLeakyBucketRateLimiter() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    ExecutorService singleThread = Executors.newSingleThreadExecutor();

    LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(20, 20);
    // 存储流量的队列
    Queue<Integer> queue = new LinkedList<>();
    // 模拟请求  不确定速率注水
    singleThread.execute(() -> {
        int count = 0;
        while (true) {
            count++;
            boolean flag = rateLimiter.tryAcquire();
            if (flag) {
                queue.offer(count);
                System.out.println(count + "--------流量被放行--------");
            } else {
                System.out.println(count + "流量被限制");
            }
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
  
    // 模拟处理请求 固定速率漏水
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        if (!queue.isEmpty()) {
            System.out.println(queue.poll() + "被处理");
        }
    }, 0, 100, TimeUnit.MILLISECONDS);

    // 保证主线程不会退出
    while (true) {
        Thread.sleep(10000);
    }
}

Der Zweck des Leaky-Bucket-Algorithmus besteht hauptsächlich darin, den Burst-Verkehr zu glätten und einen Mechanismus bereitzustellen, der sicherstellt, dass der Burst-Verkehr im Netzwerk in einen reibungslosen und stabilen Verkehr integriert wird.

Da jedoch Leaky-Buckets den Datenverkehr zu streng kontrollieren, können in manchen Szenarien die Systemressourcen nicht vollständig genutzt werden, da die Leckrate von Leaky-Buckets festgelegt ist. Selbst wenn der Downstream zu einem bestimmten Zeitpunkt größeren Datenverkehr verarbeiten kann, werden Leaky-Buckets dies nicht zulassen platzt der Verkehr durch.

04 Token Bucket

Wie lässt man Burst-Verkehr zu und begrenzt gleichzeitig die Verkehrsrate? Erfahren Sie mehr über den Token-Bucket-Algorithmus! Der Token-Bucket-Algorithmus sendet Token mit einer konstanten Rate an den Token-Bucket. Wenn die Anfrage eintrifft, versucht er, das Token aus dem Token-Bucket abzurufen. Erst wenn das Token erhalten wurde, wird es abgelehnt.

Token-Bucket hat die folgenden Eigenschaften:

1 Token werden mit einer konstanten Rate ausgegeben Kapazität des Token-Buckets Ja b, wenn der Token-Bucket voll ist, wird der neue Token verworfen Im Token-Bucket-Algorithmus wird auf Folgendes geachtet: Zwei, nämlich die Strombegrenzungsrate v/s und die Token-Bucket-Kapazität b. Die Rate a stellt die Strombegrenzungsrate des Strombegrenzers unter normalen Umständen dar und b ist die Abkürzung für Burst , der den vom aktuellen Begrenzer maximal zulässigen Burst-Verkehr angibt.

Beispiel: b=10, wenn der Token-Bucket voll ist, sind 10 Token verfügbar. Zu diesem Zeitpunkt dürfen 10 Anforderungen gleichzeitig den aktuellen Begrenzer passieren (ein gewisser Grad an Verkehrsauslastung ist zulässig). , und diese 10 Anfragen verbrauchen sofort alle Token. Nachdem die Karte übergeben wurde, kann der nachfolgende Datenverkehr den Flussbegrenzer nur mit der Rate r passieren.

Die Implementierung ist wie folgt:

public class TokenBucketRateLimiter extends MyRateLimiter {
    /**
     * 令牌桶的容量「限流器允许的最大突发流量」
     */
    private final long capacity;
    /**
     * 令牌发放速率
     */
    private final long generatedPerSeconds;
    /**
     * 最后一个令牌发放的时间
     */
    long lastTokenTime = System.currentTimeMillis();
    /**
     * 当前令牌数量
     */
    private long currentTokens;

    public TokenBucketRateLimiter(long generatedPerSeconds, int capacity) {
        this.generatedPerSeconds = generatedPerSeconds;
        this.capacity = capacity;
    }

    /**
     * 尝试获取令牌
     *
     * @return true表示获取到令牌,放行;否则为限流
     */
    @Override
    public synchronized boolean tryAcquire() {
          /**
           * 计算令牌当前数量
           * 请求时间在最后令牌是产生时间相差大于等于额1s(为啥时1s?因为生成令牌的最小时间单位时s),则
           * 1. 重新计算令牌桶中的令牌数
           * 2. 将最后一个令牌发放时间重置为当前时间
           */
        long now = System.currentTimeMillis();
        if (now - lastTokenTime >= 1000) {
            long newPermits = (now - lastTokenTime) / 1000 * generatedPerSeconds;
            currentTokens = Math.min(currentTokens + newPermits, capacity);
            lastTokenTime = now;
        }
        if (currentTokens > 0) {
            currentTokens--;
            return true;
        }
        return false;
    }
}

Was man sich vorstellen kann, ist, dass es sich bei der Implementierung um das Producer-Consumer-Muster handelt. Verwenden Sie einen Producer-Thread, um regelmäßig Token zur Blockierungswarteschlange hinzuzufügen Thread versucht, den aktuellen Begrenzer zu übergeben. Der Consumer-Thread darf den aktuellen Begrenzer erst übergeben, nachdem er das Token aus der Blockierungswarteschlange erhalten hat.

Aufgrund der Unsicherheit der Thread-Planung ist der Timer-Fehler in Szenarien mit hoher Parallelität sehr groß. Gleichzeitig erstellt der Timer selbst Planungs-Threads, was sich auch auf die Leistung des Systems auswirkt.

05 Sliding Log

Sliding Log ist ein relativ „unpopulärer“, aber wirklich nützlicher Strombegrenzungsalgorithmus. Der gleitende Protokollratenbegrenzungsalgorithmus muss den Zeitstempel der Anfrage aufzeichnen, der normalerweise in einer geordneten Sammlung gespeichert wird. Wir können alle Anfragen des Benutzers innerhalb eines Zeitraums in einer einzigen geordneten Sammlung verfolgen.

假设我们要限制给定T时间内的请求不超过N,我们只需要存储最近T时间之内的请求日志,每当请求到来时判断最近T时间内的请求总数是否超过阈值。

实现如下:

public class SlidingLogRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private static final long PERMITS_PER_MINUTE = 60;
    /**
     * 请求日志计数器, k-为请求的时间(秒),value当前时间的请求数量
     */
    private final TreeMap<Long, Integer> requestLogCountMap = new TreeMap<>();

    @Override
    public synchronized boolean tryAcquire() {
        // 最小时间粒度为s
        long currentTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
        // 获取当前窗口的请求总数
        int currentWindowCount = getCurrentWindowCount(currentTimestamp);
        if (currentWindowCount >= PERMITS_PER_MINUTE) {
            return false;
        }
        // 请求成功,将当前请求日志加入到日志中
        requestLogCountMap.merge(currentTimestamp, 1, Integer::sum);
        return true;
    }

    /**
     * 统计当前时间窗口内的请求数
     *
     * @param currentTime 当前时间
     * @return -
     */
    private int getCurrentWindowCount(long currentTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentTime - 59;
        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        return requestLogCountMap.entrySet()
                .stream()
                .filter(entry -> entry.getKey() >= startTime)
                .mapToInt(Map.Entry::getValue)
                .sum();
    }
}

滑动日志能够避免突发流量,实现较为精准的限流;同样更加灵活,能够支持更加复杂的限流策略,如多级限流,每分钟不超过100次,每小时不超过300次,每天不超过1000次,我们只需要保存最近24小时所有的请求日志即可实现。

灵活并不是没有代价的,带来的缺点就是占用存储空间要高于其他限流算法。

06分布式限流

以上几种限流算法的实现都仅适合单机限流。虽然给每台机器平均分配限流配额可以达到限流的目的,但是由于机器性能,流量分布不均以及计算数量动态变化等问题,单机限流在分布式场景中的效果总是差强人意。

分布式限流最简单的实现就是利用中心化存储,即将单机限流存储在本地的数据存储到同一个存储空间中,如常见的Redis等。

当然也可以从上层流量入口进行限流,Nginx代理服务就提供了限流模块,同样能够实现高性能,精准的限流,其底层是漏桶算法。

Das obige ist der detaillierte Inhalt vonWas sind die gängigen Strombegrenzungsalgorithmen in Java?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen