首頁 >Java >java教程 >Java常見的限流演算法怎麼實現

Java常見的限流演算法怎麼實現

WBOY
WBOY轉載
2023-05-11 22:01:111317瀏覽

為什麼要限流

在保證可用的情況下盡可能多增加進入的人數,其餘的人在排隊等待,或者返回友好提示,保證裡面的進行系統的用戶可以正常使用,防止系統雪崩。

限流演算法

限流演算法很多,常見的有三類,分別是 計數器演算法 、漏桶演算法、令牌桶演算法 。

(1)計數器:

          在一段時間間隔內,處理請求的最大數量固定,且超過部分不做處理。

(2)漏桶:

          漏桶大小為固定,處理速度固定,但請求進入速度不固定(突發狀況請求過多時,丟棄過多的請求)。

(3)令牌桶:

          令牌桶的大小為固定,而令牌的產生速度固定,但是消耗令牌(即請求)速度不固定(可應付某些某些時間請求過多的情況);每個請求都會從令牌桶中取出令牌,如果沒有令牌則丟棄該次請求。

計數器限流

在一段時間間隔內,處理請求的最大數量固定,超過部分不做處理。

舉個例子,例如我們規定對於A接口,我們1分鐘的訪問次數不能超過100次。

那我們可以這麼做:

在一開始的時候,我們可以設定一個計數器counter,每當一個請求過來的時候,counter就加1,如果counter的值大於100而該請求與第一個請求的間隔時間還在1分鐘之內,那麼說明請求數過多,拒絕訪問;

#如果該請求與第一個請求的間隔時間大於1分鐘,且counter的值還在限流範圍內,那麼就重置counter,就是這麼簡單粗暴。

Java常見的限流演算法怎麼實現

程式碼實作: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//计数器 限流
public class CounterLimiter {

    //起始时间
    private static long startTime = System.currentTimeMillis();

    //时间间隔1000ms
    private static long interval = 1000;

    //每个时间间隔内,限制数量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判断是否在上一个时间间隔内
        if (nowTime < startTime + interval) {
            //如果还在上个时间间隔内
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一个时间间隔内
            synchronized (CounterLimiter.class) {
                //防止重复初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次进行判断
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

計數器限流的不足: 

這個演算法雖然簡單,但是存在臨界問題,我們看下圖:

Java常見的限流演算法怎麼實現

從上圖我們可以看到,假設有一個惡意用戶,他在0:59時,瞬間發送了100個請求,並且1:00又瞬間發送了100個請求,那麼其實這個用戶在1秒裡面,瞬間發送了200個請求。

我們剛才規定的是1分鐘最多100個請求(規劃的吞吐量),也就是每秒鐘最多1.7個請求,用戶透過在時間窗口的重置節點處突發請求, 可以瞬間超過我們的速率限制。

用戶有可能透過演算法的這個漏洞,瞬間壓垮我們的應用。

漏桶限流

漏桶演算法限流的基本原理為:水(對應請求)從進水口進入漏桶裡,漏桶以一定的速度出水(請求放行),當水流入速度過大,桶內的總水量大於桶容量會直接溢出,請求被拒絕。

大致的漏桶限流規則如下:

(1)進水口(對應客戶端請求)以任意速率流入進入漏桶。

(2)漏桶的容量是固定的,出水(放行)速率也是固定的。

(3)漏桶容量是不變的,如果處理速度太慢,桶內水量會超出了桶的容量,則後面流入的水滴會溢出,表示請求拒絕。

Java常見的限流演算法怎麼實現

⭐漏桶演算法其實很簡單,可以粗略的認為就是注水漏水過程,往桶中以任意速率流入水,以一定速率流出水,當水超過桶容量(capacity)則丟棄,因為桶容量是不變的,保證了整體的速率。

以一定速率流出水,

Java常見的限流演算法怎麼實現

削峰: 有大量流量進入時,會發生溢出,從而限流保護服務可用

緩衝: 不會直接要求到伺服器, 緩衝壓力

程式碼實作: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒两个
    private static long rate = 2;
    //开始时间
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量问0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //计算从当前时间到开始时间流出的水,和现在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出现<0的情况
        water.set(Math.max(0, water.get()));
        //设置新的开始时间
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果当前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

漏桶的不足: 

漏桶的出水速度固定,也就是請求放行速度是固定的。

漏桶出口的速度固定,無法靈活的應付後端能力提升。例如,透過動態擴容,後端流量從1000QPS提升到1WQPS,漏桶沒有辦法。

令牌桶限流

令牌桶演算法中新請求到來時會從桶裡拿走一個令牌,如果桶內沒有令牌可拿,就拒絕服務。當然,令牌的數量也是有上限的。令牌的數量與時間和發放速率強相關,時間流逝的時間越長,會不斷往桶裡加入越多的令牌,如果令牌發放的速度比申請速度快,令牌桶會放滿令牌,直到令牌佔滿整個令牌桶。

令牌桶限流大致的規則如下:

(1)進水口依照某個速度,向桶中放入令牌。

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。

(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

Java常見的限流演算法怎麼實現

代码实现: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2个
    private static long rate = 2;
    //上次放置令牌的时间
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的数量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新时间
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中还有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

令牌桶的好处: 

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

以上是Java常見的限流演算法怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除