>  기사  >  Java  >  Java의 일반적인 전류 제한 알고리즘은 무엇입니까?

Java의 일반적인 전류 제한 알고리즘은 무엇입니까?

PHPz
PHPz앞으로
2023-05-12 18:37:13823검색

01 고정 창

고정 창이라고도 알려진 고정 창(카운터 알고리즘, 고정 창이라고도 함) 전류 제한 알고리즘은 가장 간단한 전류 제한 알고리즘으로, 내부에 유지되는 카운터를 통해 단위 시간 내 최대 액세스를 제어합니다. 시간의 단위.

분당 요청 수가 60개 이하로 제한되어 있다고 가정하고 카운터를 설정하고 요청이 도착했을 때 카운터가 임계값에 도달하면 요청이 거부되고 그렇지 않으면 카운터가 1씩 증가합니다. 카운터는 1분마다 0으로 재설정됩니다. 코드는 다음과 같이 구현됩니다.

public class CounterRateLimiter extends MyRateLimiter {
    /**
     * 每秒限制请求数
     */
    private final long permitsPerSecond;
    /**
     * 上一个窗口的开始时间
     */
    public long timestamp = System.currentTimeMillis();
    /**
     * 计数器
     */
    private int counter;

    public CounterRateLimiter(long permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 窗口内请求数量小于阈值,更新计数放行,否则拒绝请求
        if (now - timestamp < 1000) {
            if (counter < permitsPerSecond) {
                counter++;
                return true;
            } else {
                return false;
            }
        }
        // 时间窗口过期,重置计数器和时间戳
        counter = 0;
        timestamp = now;
        return true;
    }
}

고정 창의 가장 큰 장점은 구현하기 쉽고 메모리 공간이 작다는 것입니다. 시간 창에 개수만 저장하면 최신 정보를 얻을 수 있습니다. 이전 요청이 누적되지 않고 요청이 처리됩니다. 새 요청이 중단됩니다. 물론 두 개의 창이 만나면 순간적인 트래픽이 2n이 될 수도 있다는 문제도 있다.

02 슬라이딩 윈도우

순간적인 트래픽을 방지하기 위해 고정된 윈도우를 여러 개의 그리드로 나누어서 윈도우 크기를 고정하는 대신 매번 작은 그리드씩 뒤로 이동할 수 있는 슬라이딩 윈도우입니다.

예를 들어 1분은 10초씩 6개의 셀로 나눌 수 있으며 각 셀에는 카운터가 유지되며 창은 한 번에 한 셀씩 앞으로 이동합니다. 요청이 도착할 때마다 창에 있는 모든 셀의 개수 합계가 임계값을 초과하지 않는 한 요청이 해제될 수 있습니다. TCP 프로토콜의 데이터 패킷 전송에서는 흐름 제어를 위해 슬라이딩 창도 사용됩니다.

구현은 다음과 같습니다.

public class SlidingWindowRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private final long permitsPerMinute;
    /**
     * 计数器, k-为当前窗口的开始时间值秒,value为当前窗口的计数
     */
    private final TreeMap<Long, Integer> counters;

    public SlidingWindowRateLimiter(long permitsPerMinute) {
        this.permitsPerMinute = permitsPerMinute;
        this.counters = new TreeMap<>();
    }

    @Override
    public synchronized boolean tryAcquire() {
        // 获取当前时间的所在的子窗口值; 10s一个窗口
        long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / 10 * 10;
        // 获取当前窗口的请求总量
        int currentWindowCount = getCurrentWindowCount(currentWindowTime);
        if (currentWindowCount >= permitsPerMinute) {
            return false;
        }
        // 计数器 + 1
        counters.merge(currentWindowTime, 1, Integer::sum);
        return true;
    }
    /**
     * 获取当前窗口中的所有请求数(并删除所有无效的子窗口计数器)
     *
     * @param currentWindowTime 当前子窗口时间
     * @return 当前窗口中的计数
     */
    private int getCurrentWindowCount(long currentWindowTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentWindowTime - 50;
        int result = 0;

        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, Integer> entry = iterator.next();
            if (entry.getKey() < startTime) {
                iterator.remove();
            } else {
                result += entry.getValue();
            }
        }
        return result;
    }
}

슬라이딩 창은 카운터의 순간적인 트래픽 피크 문제를 해결합니다. 실제로 카운터 알고리즘도 일종의 슬라이딩 창이지만 창은 더 세밀한 단위로 분할되지 않습니다. . 카운터를 비교하면 창 분할의 입도가 미세할수록 흐름 제어가 더 정확하고 엄격해지는 것을 알 수 있습니다.

그러나 창구의 트래픽이 임계값에 도달하면 즉시 트래픽이 차단됩니다. 실제 응용에서 우리가 원하는 흐름 제한 효과는 트래픽을 한꺼번에 차단하는 것이 아니라 트래픽을 허용하는 경우가 많습니다. 시스템에 원활하게 들어가십시오.

03 Leaky Bucket Algorithm

전류를 더 원활하게 제한하는 방법은 무엇일까요? Leaky Bucket 알고리즘을 살펴보겠습니다. 요청은 물과 같은 임의의 속도로 누출되며, 주입 속도가 누출 속도보다 계속 높아지면 버킷에서 물이 누출됩니다. 가득 차게 됩니다. 이때 새로 들어오는 요청은 삭제됩니다. 전류 제한 및 형성은 누출 버킷 알고리즘의 두 가지 핵심 기능입니다.

구현은 다음과 같습니다.

public class LeakyBucketRateLimiter extends MyRateLimiter {
    // 桶的容量
    private final int capacity;
    // 漏出速率
    private final int permitsPerSecond;
    // 剩余水量
    private long leftWater;
    // 上次注入时间
    private long timeStamp = System.currentTimeMillis();

    public LeakyBucketRateLimiter(int permitsPerSecond, int capacity) {
        this.capacity = capacity;
        this.permitsPerSecond = permitsPerSecond;
    }

    @Override
    public synchronized boolean tryAcquire() {
        //1. 计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp) / 1000;
        leftWater = Math.max(0, leftWater - timeGap * permitsPerSecond);
        timeStamp = now;
        
        // 如果未满,则放行;否则限流
        if (leftWater < capacity) {
            leftWater += 1;
            return true;
        }
        return false;
    }
}

위 코드는 누출 버킷 알고리즘의 완전한 구현이 아닙니다. 즉, tryAcquire는 누출 버킷이 가득 차지 않았음을 나타내기 위해 true를 반환합니다. 그렇지 않으면 누출 버킷이 가득 찼음을 나타냅니다.

일정한 속도로 트래픽을 유출하려면 일반적으로 FIFO 대기열을 사용하여 구현해야 합니다. tryAcquire가 true를 반환하면 요청이 대기열에 추가된 다음 처리를 위해 고정된 빈도로 대기열에서 요청이 제거됩니다. 샘플 코드는 다음과 같습니다.

@Test
public void testLeakyBucketRateLimiter() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    ExecutorService singleThread = Executors.newSingleThreadExecutor();

    LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(20, 20);
    // 存储流量的队列
    Queue<Integer> queue = new LinkedList<>();
    // 模拟请求  不确定速率注水
    singleThread.execute(() -> {
        int count = 0;
        while (true) {
            count++;
            boolean flag = rateLimiter.tryAcquire();
            if (flag) {
                queue.offer(count);
                System.out.println(count + "--------流量被放行--------");
            } else {
                System.out.println(count + "流量被限制");
            }
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
  
    // 模拟处理请求 固定速率漏水
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        if (!queue.isEmpty()) {
            System.out.println(queue.poll() + "被处理");
        }
    }, 0, 100, TimeUnit.MILLISECONDS);

    // 保证主线程不会退出
    while (true) {
        Thread.sleep(10000);
    }
}

누출 버킷 알고리즘의 목적은 주로 버스트 트래픽을 원활하게 하고 네트워크의 버스트 트래픽이 원활하고 안정적인 트래픽으로 통합되도록 하는 메커니즘을 제공하는 것입니다.

그러나 Leaky 버킷은 트래픽을 너무 엄격하게 제어하기 때문에 일부 시나리오에서는 Leaky 버킷의 누출 비율이 고정되어 있기 때문에 다운스트림이 특정 순간에 더 많은 트래픽을 처리할 수 있더라도 Leaky 버킷은 완전히 사용되지 않습니다. 시스템 리소스를 완전히 사용할 수 있습니다. 버스트 트래픽이 통과하도록 허용합니다.

04 Token Bucket

버스트 트래픽을 허용하면서 트래픽 속도를 제한하는 방법은 무엇입니까? 토큰 버킷 알고리즘에 대해 알아보세요! 토큰 버킷 알고리즘은 토큰을 일정한 속도로 토큰 버킷에 보냅니다. 요청이 도착하면 토큰 버킷에서 토큰을 가져오려고 시도합니다. 토큰을 얻은 경우에만 해제할 수 있으며 그렇지 않으면 거부됩니다.

토큰 버킷의 특징은 다음과 같습니다.

1. 토큰은 일정한 비율로 발행됩니다. 현재 제한 비율을 v/s라고 가정하면 1/v초마다 하나의 토큰이 발행된다는 의미입니다. 토큰 버킷의 용량 예 b, 토큰 버킷이 가득 차면 새 토큰이 폐기됩니다

3. 요청이 현재 제한자를 통과할 수 있다는 전제는 토큰 버킷에 토큰이 있다는 것입니다

지불할 가치가 있는 매개변수 토큰 버킷 알고리즘에서 주의할 점은 2개, 즉 현재 제한 속도 v/s와 토큰 버킷 용량 b입니다. 속도 a는 일반적인 상황에서 전류 제한기의 현재 제한 속도를 나타내고 b는 버스트의 약어입니다. 전류 제한기에 의해 허용되는 최대 버스트 트래픽을 나타냅니다.

예: b=10. 토큰 버킷이 가득 차면 사용 가능한 토큰은 10개입니다. 동시에 10개의 요청이 현재 제한기를 통과할 수 있습니다(일정 수준의 트래픽 버스트는 허용됨). 이 10개의 요청은 카드가 전달된 후 즉시 모든 토큰을 소비합니다. 후속 트래픽은 r 속도로만 흐름 제한기를 통과할 수 있습니다.

구현은 다음과 같습니다:

public class TokenBucketRateLimiter extends MyRateLimiter {
    /**
     * 令牌桶的容量「限流器允许的最大突发流量」
     */
    private final long capacity;
    /**
     * 令牌发放速率
     */
    private final long generatedPerSeconds;
    /**
     * 最后一个令牌发放的时间
     */
    long lastTokenTime = System.currentTimeMillis();
    /**
     * 当前令牌数量
     */
    private long currentTokens;

    public TokenBucketRateLimiter(long generatedPerSeconds, int capacity) {
        this.generatedPerSeconds = generatedPerSeconds;
        this.capacity = capacity;
    }

    /**
     * 尝试获取令牌
     *
     * @return true表示获取到令牌,放行;否则为限流
     */
    @Override
    public synchronized boolean tryAcquire() {
          /**
           * 计算令牌当前数量
           * 请求时间在最后令牌是产生时间相差大于等于额1s(为啥时1s?因为生成令牌的最小时间单位时s),则
           * 1. 重新计算令牌桶中的令牌数
           * 2. 将最后一个令牌发放时间重置为当前时间
           */
        long now = System.currentTimeMillis();
        if (now - lastTokenTime >= 1000) {
            long newPermits = (now - lastTokenTime) / 1000 * generatedPerSeconds;
            currentTokens = Math.min(currentTokens + newPermits, capacity);
            lastTokenTime = now;
        }
        if (currentTokens > 0) {
            currentTokens--;
            return true;
        }
        return false;
    }
}

생각해야 할 것은 구현이 매우 쉽다는 것입니다. 생산자-소비자 패턴은 생산자 스레드를 사용하여 정기적으로 차단 대기열에 토큰을 추가하고 스레드는 현재 제한기를 다음과 같이 전달하려면 소비자 스레드는 차단 대기열에서 토큰을 얻은 후에만 현재 제한기를 전달할 수 있습니다.

스레드 스케줄링의 불확실성으로 인해 동시성이 높은 시나리오에서는 타이머 오류가 매우 큽니다. 동시에 타이머 자체가 스케줄링 스레드를 생성하므로 시스템 성능에도 영향을 미칩니다.

05 슬라이딩 로그

슬라이딩 로그는 상대적으로 "인기있지 않지만" 정말 유용한 전류 제한 알고리즘입니다. 슬라이딩 로그 속도 제한 알고리즘은 요청의 타임스탬프를 기록해야 하며, 이는 일반적으로 정렬된 집합을 사용하여 저장됩니다. 우리는 단일 정렬된 집합에서 특정 기간 내의 모든 사용자 요청을 추적할 수 있습니다.

假设我们要限制给定T时间内的请求不超过N,我们只需要存储最近T时间之内的请求日志,每当请求到来时判断最近T时间内的请求总数是否超过阈值。

实现如下:

public class SlidingLogRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private static final long PERMITS_PER_MINUTE = 60;
    /**
     * 请求日志计数器, k-为请求的时间(秒),value当前时间的请求数量
     */
    private final TreeMap<Long, Integer> requestLogCountMap = new TreeMap<>();

    @Override
    public synchronized boolean tryAcquire() {
        // 最小时间粒度为s
        long currentTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
        // 获取当前窗口的请求总数
        int currentWindowCount = getCurrentWindowCount(currentTimestamp);
        if (currentWindowCount >= PERMITS_PER_MINUTE) {
            return false;
        }
        // 请求成功,将当前请求日志加入到日志中
        requestLogCountMap.merge(currentTimestamp, 1, Integer::sum);
        return true;
    }

    /**
     * 统计当前时间窗口内的请求数
     *
     * @param currentTime 当前时间
     * @return -
     */
    private int getCurrentWindowCount(long currentTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentTime - 59;
        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        return requestLogCountMap.entrySet()
                .stream()
                .filter(entry -> entry.getKey() >= startTime)
                .mapToInt(Map.Entry::getValue)
                .sum();
    }
}

滑动日志能够避免突发流量,实现较为精准的限流;同样更加灵活,能够支持更加复杂的限流策略,如多级限流,每分钟不超过100次,每小时不超过300次,每天不超过1000次,我们只需要保存最近24小时所有的请求日志即可实现。

灵活并不是没有代价的,带来的缺点就是占用存储空间要高于其他限流算法。

06分布式限流

以上几种限流算法的实现都仅适合单机限流。虽然给每台机器平均分配限流配额可以达到限流的目的,但是由于机器性能,流量分布不均以及计算数量动态变化等问题,单机限流在分布式场景中的效果总是差强人意。

分布式限流最简单的实现就是利用中心化存储,即将单机限流存储在本地的数据存储到同一个存储空间中,如常见的Redis等。

当然也可以从上层流量入口进行限流,Nginx代理服务就提供了限流模块,同样能够实现高性能,精准的限流,其底层是漏桶算法。

위 내용은 Java의 일반적인 전류 제한 알고리즘은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제