>  기사  >  데이터 베이스  >  Redis 및 대기열에 대한 자세한 설명

Redis 및 대기열에 대한 자세한 설명

藏色散人
藏色散人앞으로
2020-08-26 11:51:022079검색

다음은 Redis Tutorial 칼럼에 나온 Redis와 Queue에 대한 자세한 설명입니다. 도움이 필요한 친구들에게 도움이 되길 바랍니다!

Redis 및 대기열에 대한 자세한 설명

요약

Redis는 캐시 서버뿐만 아니라 메시지 큐로도 사용할 수 있습니다. 목록 유형은 본질적으로 메시지 대기열로의 사용을 지원합니다. 아래 그림과 같이

Redis 리스트는 이중 연결 리스트로 구현되어 헤드와 테일 노드를 저장하기 때문에 리스트의 헤드와 테일에 요소를 삽입하는 것이 매우 빠릅니다.

일반적인 대기열 구현

따라서 Redis의 목록을 직접 사용하여 lpush 및 rpop 또는 rpush 및 lpop의 두 가지 간단한 지침만으로 메시지 대기열을 구현할 수 있습니다. 간단한 예는 다음과 같습니다.

저장 메시지 끝(메시지 생성자):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息生产者
 * @author yamikaze */public class Producer extends Thread { 
    public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count; 
    public Producer(String name) {        this.producerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    } 
    public int getCount() {        return count;
    }
 
    @Override    public void run() {        try {            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start(); 
        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

메시지 처리 끝(메시지 소비자):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息消费者
 * @author yamikaze */public class Customer extends Thread{ 
    private String customerName;    private volatile int count;    private Jedis jedis; 
    public Customer(String name) {        this.customerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {
            count++;
            handle(message);
        }
    } 
    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }
 
    @Override    public void run() {        while (true) {
            processMessage();
        }
    } 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

꽤 괜찮아 보이지만 위의 예에서는 메시지 소비자에게 문제가 있습니다. 즉, 목록에 보류 중인 메시지가 있는지 확인하려면 rpop 메서드 호출을 중지해야 합니다. 각 호출은 연결을 시작하므로 불필요한 낭비가 발생합니다. Thread.sleep() 및 기타 메서드를 사용하여 일정 시간이 지난 후 소비자 스레드가 다시 소비되도록 할 수도 있지만 여기에는 두 가지 문제가 있습니다.

1) 생산자 속도가 소비자 소비 속도보다 빠르면, 메시지 대기열의 길이가 계속 늘어나면 시간이 지남에 따라 많은 메모리 공간을 차지하게 됩니다.

2) Sleep 시간이 너무 길면 시간에 민감한 일부 메시지를 처리할 수 없습니다. Sleep 시간이 너무 짧으면 연결에 상대적으로 큰 오버헤드가 발생합니다.

따라서 brpop 명령을 사용할 수 있습니다. 요소가 있는 경우에만 반환됩니다. 그렇지 않으면 시간 초과될 때까지 차단되고 null을 반환하므로 소비자는 processMessage를 다음과 같이 변경할 수 있습니다. 고객이 콘솔을 지우면 프로그램에 출력이 없고 brpop에서 차단되는 것을 볼 수 있습니다. 그런 다음 Redis 클라이언트를 열고 client list 명령을 입력하여 현재 두 개의 연결이 있는지 확인합니다.

한 번 생성하고 여러 번 소비하는 대기열

Redis는 메시지 대기열 지원 외에도 게시/구독 모드를 지원하는 명령 세트도 제공합니다. Redis의 pub/sub 모드를 사용하면 한 번 생성하고 여러 번 소비하는 대기열을 구현할 수 있습니다.

1) Publish

PUBLISH 명령을 사용하면 PUBLISH 채널 메시지 형식으로 메시지를 게시할 수 있습니다.


반환 값은 메시지 구독자 수를 나타냅니다.

2) 구독

SUBSCRIBE 명령은 SUBSCRIBE 채널 형식으로 메시지를 수신하는 데 사용됩니다.

SUBSCRIBE 명령을 사용한 후 구독 모드로 들어가지만 게시로 보낸 메시지를 받지 못하는 것을 볼 수 있습니다. 메시지가 전송되기 전에만 구독할 수 있기 때문입니다. 이 모드의 다른 명령에 대해서는 응답만 볼 수 있습니다. 답글은 3가지 종류로 구분됩니다.

1. 구독인 경우 두 번째 값은 구독한 채널을 나타내고, 세 번째 값은 구독한 채널을 나타냅니다. (일련번호로 이해되나요?)

2. 메시지인 경우(메시지) ), 두 번째 값은 메시지를 생성한 채널, 세 번째 값은 메시지
3. 구독 취소인 경우 두 번째 값은 구독 취소된 채널, 세 번째 값은 현재 클라이언트의 구독 수를 나타냅니다.

UNSUBSCRIBE 명령을 사용하여 구독을 취소할 수 있습니다. 매개변수가 추가되지 않으면 SUBSCRIBE 명령으로 구독한 모든 채널이 구독 취소됩니다.

Redis는 또한 와일드카드를 기반으로 하는 메시지 구독을 지원합니다. 예를 들어 PSUBSCRIBE(패턴 구독) 명령을 사용합니다.

메시지를 다시 푸시하면 다음과 같은 결과가 나타납니다.

게시 명령이 2를 반환하는 것을 볼 수 있습니다. 구독자 측에서는 메시지를 두 번 받았습니다. 이는 PSUBSCRIBE 명령이 채널을 반복적으로 구독할 수 있기 때문입니다. PSUBSCRIBE 명령을 사용하여 구독한 채널은 PUNSUBSCRIBE 명령을 사용하여 구독을 취소해야 합니다. 마찬가지로 UNSUBSCRIBE는 PSUBSCRIBE 명령으로 구독한 채널에서 구독을 취소할 수 없습니다. 동시에 PUNSUBSCRIBE 명령어 와일드카드는 확장되지 않습니다.

예: PUNSUBSCRIBE *는 채널.*과 일치하지 않으므로 채널.* 구독을 취소하려면 PUBSUBSCRIBE 채널.*을 이와 같이 작성해야 합니다.


코드 데모는 다음과 같습니다.

public void processMessage() {    /**
     * brpop支持多个列表(队列)
     * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。
     * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
     * 0表示不限制等待,会一直阻塞在这儿     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于该指令可以监听多个Key,所以返回的是一个列表        //列表由2项组成,1) 列表名,2)数据
        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}

간단히 메시지를 보내세요.

메시지 구독자:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息发布方
 * @author yamikaze */public class Publisher { 
    public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis; 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    } 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}

클라이언트를 먼저 실행한 다음 게시자를 실행하여 메시지를 보냅니다. 출력 결과는 다음과 같습니다.

Redis의 pub/sub에도 단점이 있습니다. 오프라인에서는 제작자의 메시지가 손실됩니다.

Delay Queue

Background

비즈니스 개발 과정에서 다음과 같은 처리 지연이 필요한 몇 가지 시나리오가 있습니다.

a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:

1.Java中java.util.concurrent.DelayQueue

优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化

2.Rocketmq延时队列

优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息

3.Rabbitmq延时队列(TTL+DLX实现)

优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

Redis实现的延时消息队列适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

Redis实现的延时消息队列思路

Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?

试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

Zset的排列效果如下图:

java代码实现如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
 * @program: test
 * @description: redis实现延时队列
 * @author: xingcheng
 * @create: 2018-08-19
 **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();
    }    /**
     * 生产者,生成5个订单     */
    public void productionDelayMessage() {        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();            // 3秒后执行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后执行");
        }
    }    //消费者,取订单
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {
                System.out.println("当前没有等待的任务");                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);
                }
            }
        }
    }    static class DelayMessage implements Runnable{
        @Override        public void run() {            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();        for (int i = 0; i < 10; i++) {            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

实现效果如下:

위 내용은 Redis 및 대기열에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 cnblogs.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제