首頁  >  文章  >  四種分散式限流演算法和程式碼實現

四種分散式限流演算法和程式碼實現

Java后端技术全栈
Java后端技术全栈轉載
2023-08-15 15:54:211139瀏覽


帶著問題走近限流

為什麼要限流呢?

就像我上面說的,流量多,的確是一件好事,但是如果過載,把系統打掛了,那大家都要吃席了。

四種分散式限流演算法和程式碼實現
沒死吧

所以,在各種大促活動之前,要對系統進行壓測,評估整個系統的峰值QPS,要做一些限流的設置,超過一定閾值,就拒絕處理或延後處理,避免把系統打掛的情況出現。

限流和熔斷有什麼差別?

限流發生在流量進來之前,超過的流量進行限制。

熔斷是一種應對故障的機制,發生在流量進來之後,如果系統發生故障或異常,熔斷會自動切斷請求,防止故障進一步擴展,導致服務雪崩。

限流和削峰有什麼差別?

削峰是對流量的平滑處理,透過緩慢地增加請求的處理速率來避免系統瞬時過載。

削峰大概就是水庫,把流量儲存起來,慢慢流,限流大概就是閘口,拒絕超出的流量。

限流的通用流程

那麼具體限流要怎麼實現呢?可以歸納為以下幾個步驟:

四種分散式限流演算法和程式碼實現
限流通用流程
  1. #統計量請求流量:記錄請求的數量或速率,可以透過計數器、滑動視窗等方式進行統計。
  2. 判斷是否超過限制:根據設定的限制條件,判斷目前請求流量是否超過限制。
  3. 執行限流策略:如果請求流量超過限制,執行限流策略,例如拒絕請求、延遲處理、傳回錯誤訊息等。
  4. 更新統計資料:根據請求的處理結果,更新統計信息,如增加計數器的值、更新滑動視窗的資料等。
  5. 重複執行上述步驟:不斷地統計請求流量、判斷是否超過限制、執行限流策略、更新統計資訊

要注意的是,具體的限流演算法實作可能會根據不同的場景和需求進行調整和最佳化,例如使用令牌桶演算法、漏桶演算法等。

單機限流和分散式限流

我們注意到,在限流的通用流程裡,需要統計請求量、更新統計量,那麼這個請求量的統計和更新就必須維護在一個儲存空間裡。

假如只是一個單機版的環境,那就很好辦了,直接儲存到本地。

四種分散式限流演算法和程式碼實現
單機vs叢集

但一般來講,我們的服務都是叢集部署的,如何來實現多台機器之間整體的限流呢?

這時候就可以把我們的統計資料放到Tair或Redis等分散式的K-V儲存中。

四種限流演算法與分散式實作

接下來,我們開始實作一些常見的限流演算法,這裡使用Redis作為分散式存儲,Redis不用多說了吧,最流行的分散式快取DB;Redission作為Redis客戶端,Redission單純只是用來做分散式鎖,有些」屈才“,其實用來作為Redis的客戶端也非常好用。

四種分散式限流演算法和程式碼實現
五種限流演算法分散式實作

在開始之前,我們先簡單準備一下環境,Redis安裝和專案創建就不多說了。

  • 新增依賴
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.2</version>
        </dependency>
  • #用單例模式取得RedissonClient,這裡就不註冊成bean了,跑單測得太慢
public class RedissonConfig {

    private static final String REDIS_ADDRESS = "redis://127.0.0.1:6379";

    private static volatile  RedissonClient redissonClient;

   public static RedissonClient getInstance(){
        if (redissonClient==null){
            synchronized (RedissonConfig.class){
                if (redissonClient==null){
                    Config config = new Config();
                    config.useSingleServer().setAddress(REDIS_ADDRESS);
                    redissonClient = Redisson.create(config);
                    return redissonClient;
                }
            }
        }
        return redissonClient;
    }
}

固定視窗限流演算法

#演算法原則

固定視窗演算法,許多參考資料也稱為計數器演算法,當然我個人理解,計數器演算法是固定視窗演算法的一種特例,當然我們不糾結那麼多。

固定窗口演算法,是一種比較簡單的限流演算法,它把時間分成固定的時間窗口,每個視窗內允許的請求次數設定限制。如果在一個時間視窗內,請求次數超過了上限,那麼就會觸發限流。

四種分散式限流演算法和程式碼實現
在這裡插入圖片描述

演算法實作

基於Redisson的實作固定視窗相當簡單。在每個視窗期內,我們可以透過incrementAndGet操作來統計請求的數量。一旦視窗期結束,我們可以利用Redis的鍵過期功能來自動重置計數。

  • 來看下程式碼實作:
public class FixedWindowRateLimiter {
    public static final String KEY = "fixedWindowRateLimiter:";
    /**
     * 请求限制数量
     */
    private Long limit;
    /**
     * 窗口大小(单位:S)
     */
    private Long windowSize;

    public FixedWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }

    /**
     * 固定窗口限流
     */
    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //加分布式锁,防止并发情况下窗口初始化时间不一致问题
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(100, TimeUnit.MILLISECONDS);
            String redisKey = KEY + path;
            RAtomicLong counter = redissonClient.getAtomicLong(redisKey);
            //计数
            long count = counter.incrementAndGet();
            //如果为1的话,就说明窗口刚初始化
            if (count == 1) {
                //直接设置过期时间,作为窗口
                counter.expire(windowSize, TimeUnit.SECONDS);
            }
            //触发限流
            if (count > limit) {
                //触发限流的不记在请求数量中
                counter.decrementAndGet();
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }

}

這裡還額外用了一個分散式鎖,來解決並發情況下,視窗的初始化問題。

  • 再來測試一下
class FixedWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("1min限制10次请求固定窗口测试")
    void triggerLimit() throws InterruptedException {
        FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(10L,60L);
        //模拟不同窗口内的调用
        for (int i = 0; i < 3; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            //20个线程并发调用
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = fixedWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠1min
            TimeUnit.MINUTES.sleep(1);
        }
    }
}

當然大家也可以寫個接口,用Jmeter之類的壓測工具來進行測試。

固定視窗演算法的優點是實現簡單,佔用空間小,但是它存在臨界問題,由於視窗的切換是瞬間完成的,因此請求的處理並不平滑,可能會在視窗切換的瞬間出現流量的劇烈波動。

例如這個例子,假如在00:02,突然有大量請求過來,但是我們這時候計數重置了,那麼就沒法限制突發的這些流量。

四種分散式限流演算法和程式碼實現
臨界值問題

滑動視窗演算法

#為了緩解固定視窗的突發流量問題,可以採用滑動窗口演算法,電腦網路中TCP的流量控制就是採用滑動視窗演算法。

演算法原理

滑動窗口限流演算法的原理是將一個大的時間窗口分割成多個小的時間窗口,每個小的窗口都有獨立的計數。

請求過來的時候,判斷請求的次數是否超過整個視窗的限制。視窗的移動是每次向前滑動一個小的單元視窗。

例如下面這個滑動窗口,將大時間窗口1min分成了5個小窗口,每個小窗口的時間是12s。

每個單元格有自己獨立的計數器,每過12s就會向前移動一格。

假如有請求在00:01的時候過來,這時候視窗的數數就是3 12 9 15=39,也能起到限流的作用。

四種分散式限流演算法和程式碼實現
滑動視窗演算法示意圖

這就是為什麼滑動視窗能解決臨界問題,滑的格子越多,那麼整體的滑動就會越平滑,限流的效果就會越精準。

演算法實作

那我們這裡要怎麼實作滑動視窗限流演算法呢?非常簡單,我們可以直接使用Redis的有序集合(zset)結構。

我們使用時間戳記作為score和member,有請求過來的時候,就把當前時間戳加入到有序集合裡。那麼視窗之外的請求,我們可以根據視窗大小,計算出起始時間戳,刪除視窗外的請求。這樣,有序集合的大小,就是我們這個視窗的請求數了。

四種分散式限流演算法和程式碼實現
zset实现滑动窗口
  • 代码实现
public class SlidingWindowRateLimiter {
    public static final String KEY = "slidingWindowRateLimiter:";

    /**
     * 请求次数限制
     */
    private Long limit;
    /**
     * 窗口大小(单位:S)
     */
    private Long windowSize;

    public SlidingWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }


    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //窗口计数
        RScoredSortedSet<Long> counter = redissonClient.getScoredSortedSet(KEY + path);
        //使用分布式锁,避免并发设置初始值的时候,导致窗口计数被覆盖
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(200, TimeUnit.MILLISECONDS);
            // 当前时间戳
            long currentTimestamp = System.currentTimeMillis();
            // 窗口起始时间戳
            long windowStartTimestamp = currentTimestamp - windowSize * 1000;
            // 移除窗口外的时间戳,左闭右开
            counter.removeRangeByScore(0, true, windowStartTimestamp, false);
            // 将当前时间戳作为score,也作为member,
            // TODO:高并发情况下可能没法保证唯一,可以加一个唯一标识
            counter.add(currentTimestamp, currentTimestamp);
            //使用zset的元素个数,作为请求计数
            long count = counter.size();
            // 判断时间戳数量是否超过限流阈值
            if (count > limit) {
                System.out.println("[triggerLimit] path:" + path + " count:" + count + " over limit:" + limit);
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }

}

这里还有一个小的可以完善的点,zset在member相同的情况下,是会覆盖的,也就是说高并发情况下,时间戳可能会重复,那么就有可能统计的请求偏少,这里可以用时间戳+随机数来缓解,也可以生成唯一序列来解决,比如UUID、雪花算法等等。

  • 还是来测试一下
class SlidingWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("滑动窗口限流")
    void triggerLimit() throws InterruptedException {
        SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(10L, 1L);
        //模拟在不同时间片内的请求
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = slidingWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠10s
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}

用Redis实现了滑动窗口限流,解决了固定窗口限流的边界问题,当然这里也带来了新的问题,因为我们存储了窗口期的所有请求,所以高并发的情况下,可能会比较占内存。

漏桶算法

我们可以看到,计数器类的限流,体现的是一个“戛然而止”,超过限制,立马决绝,但是有时候,我们可能只是希望请求平滑一些,追求的是“波澜不惊”,这时候就可以考虑使用其它的限流算法。

算法原理

漏桶算法(Leaky Bucket),名副其实,就是请求就像水一样以任意速度注入漏桶,而桶会按照固定的速率将水漏掉。

四種分散式限流演算法和程式碼實現
漏桶算法

当进水速率大于出水速率的时候,漏桶会变满,此时新进入的请求将会被丢弃。

漏桶算法的两大作用是网络流量整形(Traffic Shaping)和速度限制(Rate Limiting)。

算法实现

我们接着看看具体应该怎么实现。

在滑动窗口限流算法里我们用到了RScoredSortedSet,非常好用对不对,这里也可以用这个结构,直接使用ZREMRANGEBYSCORE命令来删除旧的请求。

进水就不用多说了,请求进来,判断桶有没有满,满了就拒绝,没满就往桶里丢请求。

那么出水怎么办呢?得保证稳定速率出水,可以用一个定时任务,来定时去删除旧的请求。

  • 代码实现
public class LeakyBucketRateLimiter {
    private RedissonClient redissonClient = RedissonConfig.getInstance();
    private static final String KEY_PREFIX = "LeakyBucket:";

    /**
     * 桶的大小
     */
    private Long bucketSize;
    /**
     * 漏水速率,单位:个/秒
     */
    private Long leakRate;


    public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {
        this.bucketSize = bucketSize;
        this.leakRate = leakRate;
        //这里启动一个定时任务,每s执行一次
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
    }

    /**
     * 漏水
     */
    public void leakWater() {
        RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
        //遍历所有path,删除旧请求
        for(String path:pathSet){
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(KEY_PREFIX + path);
            // 获取当前时间
            long now = System.currentTimeMillis();
            // 删除旧的请求
            bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);
        }
    }

    /**
     * 限流
     */
    public boolean triggerLimit(String path) {
        //加锁,防止并发初始化问题
        RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);
        try {
            rLock.lock(100,TimeUnit.MILLISECONDS);
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(redisKey);
            //这里用一个set,来存储所有path
            RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
            pathSet.add(path);
            // 获取当前时间
            long now = System.currentTimeMillis();
            // 检查桶是否已满
            if (bucket.size() < bucketSize) {
                // 桶未满,添加一个元素到桶中
                bucket.add(now,now);
                return false;
            }
            // 桶已满,触发限流
            System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());
            return true;
        }finally {
            rLock.unlock();
        }
    }
    
}

在代码实现里,我们用了RSet来存储path,这样一来,一个定时任务,就可以搞定所有path对应的桶的出水,而不用每个桶都创建一个一个定时任务。

这里我直接用ScheduledExecutorService启动了一个定时任务,1s跑一次,当然集群环境下,每台机器都跑一个定时任务,对性能是极大的浪费,而且不好管理,我们可以用分布式定时任务,比如xxl-job去执行leakWater

  • 最后还是大家熟悉的测试
class LeakyBucketRateLimiterTest {

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("漏桶算法")
    void triggerLimit() throws InterruptedException {
        LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠10s
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}

漏桶算法能够有效防止网络拥塞,实现也比较简单。

但是,因为漏桶的出水速率是固定的,假如突然来了大量的请求,那么只能丢弃超量的请求,即使下游能处理更大的流量,没法充分利用系统资源

令牌桶算法

令牌桶算法来了!

算法原理

令牌桶算法是对漏桶算法的一种改进。

它的主要思想是:系统以一种固定的速率向桶中添加令牌,每个请求在发送前都需要从桶中取出一个令牌,只有取到令牌的请求才被通过。因此,令牌桶算法允许请求以任意速率发送,只要桶中有足够的令牌。

四種分散式限流演算法和程式碼實現
令牌桶算法

算法实现

我们继续看怎么实现,首先是要发放令牌,要固定速率,那我们又得开个线程,定时往桶里投令牌,然后……

——然后Redission提供了令牌桶算法的实现,舒不舒服?

四種分散式限流演算法和程式碼實現
拿来吧你

拿来就用!

  • 代码实现
public class TokenBucketRateLimiter {

    public static final String KEY = "TokenBucketRateLimiter:";

    /**
     * 阈值
     */
    private Long limit;
    /**
     * 添加令牌的速率,单位:个/秒
     */
    private Long tokenRate;

    public TokenBucketRateLimiter(Long limit, Long tokenRate) {
        this.limit = limit;
        this.tokenRate = tokenRate;
    }

    /**
     * 限流算法
     */
    public boolean triggerLimit(String path){
        RedissonClient redissonClient=RedissonConfig.getInstance();
        RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);
        // 初始化,设置速率模式,速率,间隔,间隔单位
        rateLimiter.trySetRate(RateType.OVERALL, limit, tokenRate, RateIntervalUnit.SECONDS);
        // 获取令牌
        return rateLimiter.tryAcquire();
    }
}

Redisson实现的,还是比较稳的,这里就不测试了。

关于Redission是怎么实现这个限速器的,大家可以看一下参考[3],还是Redisson家的老传统——Lua脚本,设计相当巧妙。

总结

在这篇文章里,我们对(三)种限流算法进行了分布式实现,采用了非常好用的Redission客户端,当然我们也有不完善的地方:

  • 并发处理采用了分布式锁,高并发情况下,对性能有一定损耗,逻辑最好还是直接采用Lua脚本实现,来提高性能
  • 可以提供更加优雅的调用方式,比如利用aop实现注解式调用,代码设计也可以更加优雅,继承体系可以完善一下
  • 没有实现限流的拒绝策略,比如抛异常、缓存、丢进MQ打散……限流是一种方法,最终的目的还是尽可能保证系统平稳

如果后面有机会,希望可以继续完善这个简单的Demo,达到工程级的应用。

除此之外,市面上也有很多好用的开源限流工具:

  • Guava RateLimiter ,基于令牌桶算法限流,当然是单机的;
  • Sentinel ,基于滑动窗口限流,支持单机,也支持集群
  • 网关限流,很多网关自带限流方法,比如Spring Cloud GatewayNginx

以上是四種分散式限流演算法和程式碼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:Java后端技术全栈。如有侵權,請聯絡admin@php.cn刪除