>  기사  >  Java  >  Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법

WBOY
WBOY앞으로
2023-05-11 22:01:111269검색

흐름을 제한하는 이유는 무엇인가요?

이용 가능성을 보장하면서 입장 인원을 최대한 늘리거나, 내부 시스템 사용자가 정상적으로 사용할 수 있도록 친절한 안내를 보내드립니다. 시스템 눈사태.

전류 제한 알고리즘

전류 제한 알고리즘에는 카운터 알고리즘, 누출 버킷 알고리즘, 토큰 버킷 알고리즘이라는 세 가지 일반적인 범주가 있습니다.器 (1) 카운터 :

일정 기간 동안 최대 요청 횟수가 정해져 있으며, 초과된 부분은 처리되지 않습니다.

(2) Leaky bucket :

                   Leaky bucket의 크기는 고정되어 있고, 처리 속도는 고정되어 있지만 요청 진입 속도는 고정되어 있지 않습니다(긴급 상황에서 요청이 너무 많으면 너무 많은 요청을 폐기하게 됩니다). .

(3) 토큰 버킷:

                   토큰 버킷의 크기는 고정되어 있고 토큰 생성 속도는 고정되어 있지만 토큰 소비(예: 요청) 속도는 고정되어 있지 않습니다(요청이 너무 많은 상황에 대처할 수 있음). 특정 시간), 각 요청은 토큰 버킷에서 토큰을 가져오고 토큰이 없으면 요청이 삭제됩니다.

카운터 현재 제한

일정 기간 동안 처리할 수 있는 최대 요청 수는 정해져 있으며 초과분은 처리되지 않습니다.

예를 들어 인터페이스 A의 경우 1분에 100회 이상 액세스할 수 없다고 규정합니다.

그러면 다음과 같이 할 수 있습니다:

처음에는 카운터 카운터를 설정할 수 있습니다. 요청이 올 때마다 카운터는 1씩 증가합니다. 카운터 값이 100보다 크고 요청이 다음과 같습니다. 첫 번째 요청 간격이 여전히 1분 이내이면 요청이 너무 많아 액세스가 거부되었음을 의미합니다.

요청과 첫 번째 요청 사이의 간격이 1분을 초과하고 카운터 값이 여전히 유지됩니다. 현재 제한 범위 내에서 카운터를 재설정하는 것은 그만큼 간단하고 간단합니다.

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법코드 구현:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//计数器 限流
public class CounterLimiter {

    //起始时间
    private static long startTime = System.currentTimeMillis();

    //时间间隔1000ms
    private static long interval = 1000;

    //每个时间间隔内,限制数量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判断是否在上一个时间间隔内
        if (nowTime < startTime + interval) {
            //如果还在上个时间间隔内
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一个时间间隔内
            synchronized (CounterLimiter.class) {
                //防止重复初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次进行判断
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

카운터 전류 제한 부족:

이 알고리즘은 간단하지만 심각한 문제가 있습니다. 아래 그림을 살펴보겠습니다.

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법 위 그림에서 볼 수 있습니다. 악의적인 사용자가 0시 59분에 즉시 100개의 요청을 보냈고, 1시에 즉시 100개의 요청을 보냈다고 가정합니다. 실제로 이 사용자는 1초 만에 즉시 200개의 요청을 보냈습니다.

방금 규정한 것은 분당 최대 100개 요청(계획된 처리량)이며, 이는 초당 최대 1.7개 요청입니다. 사용자는 시간 제한 재설정 노드에서 버스트 요청을 하여 즉시 속도를 초과할 수 있습니다.

사용자는 알고리즘의 이 허점을 이용하여 애플리케이션을 즉시 무너뜨릴 수 있습니다.

누수 버킷 전류 제한

누수 버킷 알고리즘 전류 제한의 기본 원리는 물(요청에 해당)이 물 입구에서 새 버킷으로 들어가고 새 버킷이 특정 속도로 물을 배출하는 것입니다(해제 요청). 물 유입 속도가 너무 큰 경우 물통에 있는 물의 총량이 물통 용량보다 크면 바로 물이 넘쳐 요청이 거부됩니다.

대략적인 누수 버킷 전류 제한 규칙은 다음과 같습니다.

(1) 물 입구(클라이언트 요청에 해당)는 어떤 속도로든 새 버킷으로 흘러 들어갑니다.

(2) 새는 물통의 용량은 정해져 있고, 물 배출(방출)율도 정해져 있습니다.

(3) 새는 물통의 용량은 변하지 않습니다. 처리 속도가 너무 느리면 물통의 물 양이 물통의 용량을 초과하게 되어 나중에 유입되는 물방울이 넘쳐 요청이 거부되었음을 나타냅니다. .

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법⭐누수 버킷 알고리즘은 실제로 매우 간단합니다. 물이 어떤 속도로든 물통으로 유입되고 물이 버킷 용량을 초과하면 흘러나오는 과정으로 대략적으로 생각할 수 있습니다. , 버킷 용량이 변경되지 않아 전체 속도를 보장하기 때문에 폐기됩니다.

물은 일정 비율로 흘러나오고,

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법Peak Clipping : 트래픽이 많이 들어오면 Overflow가 발생하므로 전류 제한 보호 서비스가 가능합니다.

Buffering : 서버에 직접 요청하지 않습니다. 완충 압력

코드 구현:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒两个
    private static long rate = 2;
    //开始时间
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量问0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //计算从当前时间到开始时间流出的水,和现在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出现<0的情况
        water.set(Math.max(0, water.get()));
        //设置新的开始时间
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果当前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

새는 물통의 단점:

새는 물통의 물 배출 속도는 고정되어 있습니다. 즉, 요청된 방출 속도가 고정되어 있습니다.

새는 배럴 배출구는 속도가 고정되어 있어 백엔드 성능 향상에 유연하게 대처할 수 없습니다. 예를 들어 동적 확장을 통해 백엔드 트래픽이 1000QPS에서 1WQPS로 증가하는데 버킷 누출에 대한 솔루션은 없습니다.

토큰 버킷 현재 제한

토큰 버킷 알고리즘에서는 새 요청이 도착하면 버킷에 사용 가능한 토큰이 없으면 서비스가 거부됩니다. 물론 토큰의 개수도 제한되어 있습니다. 토큰의 수는 시간 및 발행률과 밀접한 관련이 있으며, 시간이 지날수록 더 많은 토큰이 버킷에 추가됩니다. 토큰 발행 속도가 애플리케이션 속도보다 빠르면 토큰 버킷이 토큰으로 채워집니다. 토큰이 전체 토큰 버킷을 차지할 때까지.

토큰 버킷의 흐름을 제한하는 일반적인 규칙은 다음과 같습니다.

(1) 물 주입구는 특정 속도로 토큰을 버킷에 넣습니다.

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。

(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법

代码实现: 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2个
    private static long rate = 2;
    //上次放置令牌的时间
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,请求可已通过
     * false 代表限制,不让请求通过
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的数量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新时间
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中还有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //测试
    public static void main(String[] args) {

        //线程池,用于多线程模拟测试
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果
        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time + "s");
    }

}

令牌桶的好处: 

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

위 내용은 Java에서 일반적인 전류 제한 알고리즘을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제