>  기사  >  데이터 베이스  >  Redis가 대기열 차단, 지연, 게시 및 구독을 구현하는 방법

Redis가 대기열 차단, 지연, 게시 및 구독을 구현하는 방법

WBOY
WBOY앞으로
2022-05-23 12:15:043235검색

이 글은 Redis에 대한 관련 지식을 제공하며, 대기열 차단, 지연, 게시 및 구독 구현 방법에 대한 관련 문제를 주로 소개합니다. 모두에게 도움이 되기를 바랍니다.

Redis가 대기열 차단, 지연, 게시 및 구독을 구현하는 방법

추천 학습: Redis 비디오 튜토리얼

Redis는 캐시 서버뿐만 아니라 메시지 대기열로도 사용할 수 있습니다. 목록 유형은 본질적으로 메시지 대기열로의 사용을 지원합니다. 아래 그림과 같이
Redis가 대기열 차단, 지연, 게시 및 구독을 구현하는 방법

Redis 리스트는 이중 연결 리스트로 구현되기 때문에 헤드 노드와 테일 노드가 저장되므로 리스트의 헤드와 테일에 요소를 삽입하거나 검색하는 속도가 매우 빠르며, 그리고 시간복잡도는 O(1)이다.

일반 대기열

Redis의 목록 데이터 유형을 직접 사용하여 lpush 및 rpop 또는 rpush 및 lpop의 두 가지 간단한 지침만으로 메시지 대기열을 구현할 수 있습니다.

  • lpush+rpop: 왼쪽 및 오른쪽 대기열
  • rpush+lpop: 왼쪽 및 오른쪽 대기열

다음은 redis 명령을 사용하여 일반 대기열을 시뮬레이션합니다.
lpush 명령을 사용하여 메시지 생성:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"

rpop 명령을 사용하여 메시지 소비:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"

다음은 Java 코드를 사용하여 공통 대기열을 구현합니다.

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer SingleConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}

위 코드는 기본적으로 일반 대기열의 생성 및 소비를 구현했지만 위 예의 메시지 소비자에는 두 가지 문제가 있습니다.

  1. 소비자에게 지속적인 호출이 필요합니까? redis 리스트에 처리할 데이터(메시지)가 있는지 확인하는 rpop 메소드입니다. 연결은 호출될 때마다 시작됩니다. 목록에 데이터가 없어 빈 폴링이 많이 발생하고 불필요한 낭비가 발생할 수 있습니다. Thread.sleep() 및 기타 메서드를 사용하여 일정 시간 후에 소비자 스레드가 다시 소비되도록 할 수 있습니다. 절전 시간이 너무 길면 일부 시간에 민감한 메시지를 처리할 수 없습니다. , 연결에 대한 비교도 발생합니다.
  2. 생산자 속도가 소비자 소비 속도보다 빠르면 메시지 대기열의 길이가 계속 늘어나 시간이 지남에 따라 많은 메모리 공간을 차지하게 됩니다.

Blocking Queue

소비자는 brpop 명령을 사용하여 Redis 목록에서 데이터를 가져올 수 있습니다. 이 명령은 요소가 있는 경우에만 반환되며 시간 초과될 때까지 차단되고 null을 반환하므로 소비자는 데이터를 얻기 위해 잠을 자지 않아도 됩니다. 이는 차단 대기열을 구현하는 것과 같습니다.

redis의 brpop 명령을 사용하여 차단 대기열을 시뮬레이션합니다.

>brpop queue:single 30

brpop에서 명령줄이 차단되어 있고 30초 후에 데이터 없이 반환되는 것을 볼 수 있습니다.

Java 코드는 다음과 같이 구현됩니다.

생산자는 일반 대기열의 생산자와 동일합니다.

Consumer BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>

단점: 한 번에 여러 소비를 달성하는 것은 불가능합니다.

게시-구독 모드

Redis는 메시지 대기열 지원 외에도 게시/구독 모드를 지원하는 명령 세트도 제공합니다. Redis의 pub/sub 모드를 사용하면 한 번 생성하고 여러 번 소비하는 대기열을 구현할 수 있습니다.

Publish: PUBLISH 명령어를 사용하여 메시지를 게시할 수 있습니다. 형식은 다음과 같습니다.

PUBLISH channel message

반환 값은 메시지 구독자 수를 나타냅니다.

구독: SUBSCRIBE 명령은 메시지를 수신하는 데 사용됩니다. 형식은

SUBSCRIBE channel

SUBSCRIBE 명령을 사용한 후 구독 모드로 들어가지만 구독하기 전에 발행으로 보낸 메시지를 받지 못하기 때문입니다. 메시지가 전송되기 전에만 구독할 수 있습니다. 이 모드의 다른 명령에 대해서는 응답만 볼 수 있습니다.

답글은 세 가지 유형으로 나누어집니다.

  1. 구독인 경우 두 번째 값은 구독한 채널을 나타내고, 세 번째 값은 구독한 채널 수를 나타냅니다.
  2. 메시지(메시지)인 경우 두 번째 값은 메시지를 생성한 채널에서 세 번째 값은 메시지입니다.
  3. 구독 취소인 경우 두 번째 값은 구독 취소된 채널을 나타내고, 세 번째 값은 현재 클라이언트의 구독 수를 나타냅니다.

다음은 redis 명령을 사용하여 게시-구독 모드를 시뮬레이션합니다.

Producer:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1

Consumer:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"

Java 코드는 다음과 같이 구현됩니다.

Producer PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer PubsubConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}

Consumer는 여러 개를 시작할 수 있으며, 각 Consumer는 모든 뉴스를 받을 수 있습니다.

UNSUBSCRIBE 명령을 사용하여 구독을 취소할 수 있습니다. 매개변수가 추가되지 않으면 SUBSCRIBE 명령으로 구독한 모든 채널이 구독 취소됩니다.

Redis는 또한 PSUBSCRIBE(패턴 구독) 명령을 사용하여 와일드카드 기반 메시지 구독을 지원합니다. 예:

psubscribe channel.*

PSUBSCRIBE 명령으로 구독한 채널은 PUNSUBSCRIBE 명령을 사용하여 구독을 취소해야 합니다. 이 명령은 SUBSCRIBE로 구독한 채널에서 구독을 취소할 수 없습니다. UNSUBSCRIBE를 관리하더라도 PSUBSCRIBE 명령으로 구독한 채널을 구독 취소할 수 없습니다.

동시에 PUNSUBSCRIBE 명령의 와일드카드 문자는 확장되지 않습니다. 예: PUNSUBSCRIBE *不会匹配到channel.*,所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*.

Redis의 pub/sub에도 단점이 있습니다. 즉, 소비자가 오프라인이 되면 생산자의 메시지가 손실된다는 것입니다.

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>

위 내용은 Redis가 대기열 차단, 지연, 게시 및 구독을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 csdn.net에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제