Home  >  Article  >  Java  >  How to implement common current limiting algorithms in Java

How to implement common current limiting algorithms in Java

WBOY
WBOYforward
2023-05-11 22:01:111225browse

Why limit the flow

Increase the number of people entering as much as possible while ensuring availability. The rest of the people are waiting in line, or return friendly prompts to ensure that users of the system can use it normally. Prevent system avalanche.

Current limiting algorithm

There are many current limiting algorithms, and there are three common categories, namely counter algorithm, leaky bucket algorithm, and token bucket algorithm.

(1) Counter:

Within a period of time, the maximum number of requests to be processed is fixed, and the excess will not be processed.

(2) Leaky bucket:

The size of the leaky bucket is fixed and the processing speed is fixed, but the request entry speed is not fixed (too many requests will be discarded when there are too many requests in emergencies).

(3) Token bucket:

The size of the token bucket is fixed, and the token generation speed is fixed, but the token consumption (ie, request) speed is not fixed (it can cope with some Too many time requests); each request will take a token from the token bucket, and if there is no token, the request will be discarded.

Counter current limit

Within a period of time, the maximum number of requests to be processed is fixed, and the excess will not be processed.

For example, we stipulate that for interface A, we cannot access more than 100 times in 1 minute.

Then we can do this:

At the beginning, we can set a counter counter. Whenever a request comes, the counter will increase by 1. If the value of counter is greater than 100 And the interval between this request and the first request is still within 1 minute, then it means that there are too many requests and access is denied;

If the interval between this request and the first request is greater than 1 minute, and The value of counter is still within the current limit range, then reset counter, it is as simple and crude as that.

How to implement common current limiting algorithms in Java

Code implementation:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//计数器 限流
public class CounterLimiter {

    //起始时间
    private static long startTime = System.currentTimeMillis();

    //时间间隔1000ms
    private static long interval = 1000;

    //每个时间间隔内,限制数量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判断是否在上一个时间间隔内
        if (nowTime < startTime + interval) {
            //如果还在上个时间间隔内
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一个时间间隔内
            synchronized (CounterLimiter.class) {
                //防止重复初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次进行判断
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

The shortcomings of counter current limiting:

Although this algorithm is simple, there are critical problems. Let’s take a look. Picture:

How to implement common current limiting algorithms in Java

We can see from the above picture that assuming there is a malicious user, he sends 100 requests instantly at 0:59, and at 1:00 Another 100 requests were sent instantly, so in fact, this user sent 200 requests instantly within 1 second.

What we just stipulated is a maximum of 100 requests per minute (planned throughput), which is a maximum of 1.7 requests per second. Users can make burst requests at the reset node of the time window instantly. Exceeding our rate limit.

Users may use this loophole in the algorithm to instantly crush our application.

Leaky bucket current limit

The basic principle of leaky bucket algorithm current limit is: water (corresponding to the request) enters the leaky bucket from the water inlet, and the leaky bucket discharges water at a certain speed (requesting release) ), when the water inflow speed is too high and the total water volume in the bucket is greater than the bucket capacity, it will directly overflow and the request will be rejected.

The rough leaky bucket current limiting rules are as follows:

(1) The water inlet (corresponding to the client request) flows into the leaky bucket at any rate.

(2) The capacity of the leaky bucket is fixed, and the water outlet (release) rate is also fixed.

(3) The capacity of the leaky bucket is unchanged. If the processing speed is too slow, the water volume in the bucket will exceed the bucket's capacity, and the water droplets flowing in later will overflow, indicating that the request is rejected.

How to implement common current limiting algorithms in Java

⭐The leaky bucket algorithm is actually very simple. It can be roughly thought of as the water leakage process. Water flows into the bucket at any rate and flows out at a certain rate. When the water exceeds The bucket capacity (capacity) is discarded, because the bucket capacity is unchanged, ensuring the overall rate.

Water flows out at a certain rate,

How to implement common current limiting algorithms in Java

Peak clipping: When a large amount of traffic enters, overflow will occur, so the current limiting protection service is available

Buffering: Not requesting directly to the server, buffering pressure

Code implementation:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒两个
    private static long rate = 2;
    //开始时间
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量问0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //计算从当前时间到开始时间流出的水,和现在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出现<0的情况
        water.set(Math.max(0, water.get()));
        //设置新的开始时间
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果当前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

Disadvantages of the leaky bucket:

The water outlet speed of the leaky bucket is fixed, that is, the request The release speed is fixed.

The leaky bucket exit speed is fixed and cannot flexibly respond to the improvement of back-end capabilities. For example, through dynamic expansion, the back-end traffic is increased from 1000QPS to 1WQPS, and there is no solution for leaky buckets.

Token bucket current limit

In the token bucket algorithm, when a new request arrives, a token will be taken from the bucket. If there is no token available in the bucket, the service will be denied. Of course, the number of tokens is also limited. The number of tokens is strongly related to time and issuance rate. The longer time passes, the more tokens will be added to the bucket. If the token issuance speed is faster than the application speed, the token bucket will be filled with tokens. , until the tokens occupy the entire token bucket.

The general rules for limiting the flow of token buckets are as follows:

(1) The water inlet puts tokens into the bucket at a certain speed.

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。

(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

How to implement common current limiting algorithms in Java

代码实现: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2个
    private static long rate = 2;
    //上次放置令牌的时间
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的数量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新时间
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中还有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

令牌桶的好处: 

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

The above is the detailed content of How to implement common current limiting algorithms in Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete