Heim >Datenbank >Redis >Detaillierte Erläuterung von Redis und Warteschlangen

Detaillierte Erläuterung von Redis und Warteschlangen

藏色散人
藏色散人nach vorne
2020-08-26 11:51:022207Durchsuche

In der folgenden Spalte finden Sie eine detaillierte Erklärung zu Redis und Warteschlangen aus der Spalte „Redis-Tutorial“. Ich hoffe, dass sie Freunden in Not hilfreich sein wird!

Zusammenfassung

Detaillierte Erläuterung von Redis und WarteschlangenRedis kann nicht nur als Cache-Server, sondern auch als Nachrichtenwarteschlange verwendet werden. Sein Listentyp unterstützt grundsätzlich die Verwendung als Nachrichtenwarteschlange. Wie in der folgenden Abbildung dargestellt:

Da die Redis-Liste mithilfe einer doppelt verknüpften Liste implementiert wird und die Kopf- und Endknoten speichert, können Elemente am Kopf und am Ende der Liste sehr schnell eingefügt werden.

Implementierung einer gewöhnlichen Warteschlange

Sie können die Nachrichtenwarteschlange also direkt mit der Liste von Redis implementieren, und zwar mit nur zwei einfachen Anweisungen: lpush und rpop oder rpush und lpop. Ein einfaches Beispiel ist wie folgt:

Ende der Speichernachricht (Nachrichtenproduzent):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息生产者
 * @author yamikaze */public class Producer extends Thread { 
    public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count; 
    public Producer(String name) {        this.producerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    } 
    public int getCount() {        return count;
    }
 
    @Override    public void run() {        try {            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start(); 
        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

Ende der Nachrichtenverarbeitung (Nachrichtenkonsument):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息消费者
 * @author yamikaze */public class Customer extends Thread{ 
    private String customerName;    private volatile int count;    private Jedis jedis; 
    public Customer(String name) {        this.customerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {
            count++;
            handle(message);
        }
    } 
    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }
 
    @Override    public void run() {        while (true) {
            processMessage();
        }
    } 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

Es ​​scheint ziemlich gut zu sein, aber im obigen Beispiel gibt es ein Problem mit dem Nachrichtenkonsumenten. Das heißt, es muss angehalten werden. Rufen Sie die RPOP-Methode auf, um zu überprüfen, ob in der Liste ausstehende Nachrichten vorhanden sind. Bei jedem Anruf wird eine Verbindung aufgebaut, was unnötige Verschwendung verursacht. Vielleicht verwenden Sie Thread.sleep() und andere Methoden, um den Consumer-Thread nach einer gewissen Zeit wieder konsumieren zu lassen, aber dabei gibt es zwei Probleme:

1) Wenn die Producer-Geschwindigkeit größer ist als die Consumer-Verbrauchsgeschwindigkeit, Die Länge der Nachrichtenwarteschlange wird weiter zunehmen und mit der Zeit viel Speicherplatz beanspruchen.

2) Wenn die Ruhezeit zu lang ist, können einige zeitkritische Nachrichten nicht verarbeitet werden. Wenn die Ruhezeit zu kurz ist, verursacht dies auch einen relativ großen Overhead auf der Verbindung.

Sie können die brpop-Anweisung also nur verwenden, wenn ein Element vorhanden ist. Wenn nicht, wird sie bis zum Timeout blockiert und null zurückgegeben. Der Verbraucher kann also die Prozessnachricht wie folgt ändern:

public void processMessage() {    /**
     * brpop支持多个列表(队列)
     * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。
     * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
     * 0表示不限制等待,会一直阻塞在这儿     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于该指令可以监听多个Key,所以返回的是一个列表        //列表由2项组成,1) 列表名,2)数据
        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}

Dann können Sie ausführen Kunde und leeren Sie die Konsole. Sie können sehen, dass das Programm keine Ausgabe hat und hier in brpop blockiert ist. Öffnen Sie dann den Redis-Client und geben Sie den Befehl client list ein, um festzustellen, dass derzeit zwei Verbindungen vorhanden sind.

Warteschlange, die einmal produziert und viele Male verbraucht

Redis bietet nicht nur Unterstützung für Nachrichtenwarteschlangen, sondern auch eine Reihe von Befehlen zur Unterstützung des Veröffentlichungs-/Abonnementmodus. Mit dem Pub/Sub-Modus von Redis können Sie eine Warteschlange implementieren, die einmal produziert und mehrmals konsumiert.

1) Veröffentlichen

Mit der PUBLISH-Anweisung kann eine Nachricht im Format PUBLISH-Kanalnachricht veröffentlicht werden

Der Rückgabewert gibt die Anzahl der Abonnenten der Nachricht an.

2) Abonnement
Die SUBSCRIBE-Anweisung wird verwendet, um eine Nachricht im Format SUBSCRIBE-Kanal zu empfangen.

Sie können sehen, dass Sie nach Verwendung der SUBSCRIBE-Anweisung in den Abonnementmodus wechseln, die von Publish gesendete Nachricht jedoch nicht erhalten denn Sie können sich erst anmelden, bevor die Nachricht empfangen wird. Für andere Befehle in diesem Modus können nur Antworten angezeigt werden. Antworten werden in drei Typen unterteilt:
1. Wenn es sich um ein Abonnement handelt, gibt der zweite Wert den abonnierten Kanal an und der dritte Wert gibt an, welcher Kanal abonniert ist? (als Seriennummer verstanden?)
2. Wenn es sich um eine Nachricht handelt (Nachricht). ), der zweite Wert ist der Kanal, der die Nachricht generiert hat, und der dritte Wert ist die Nachricht

3. Wenn es sich um „Abbestellen“ handelt, stellt der zweite Wert den nicht abonnierten Kanal dar und der dritte Wert stellt die Anzahl der Abonnements des aktuellen Clients dar.


Mit dem Befehl UNSUBSCRIBE können Sie sich abmelden. Wenn keine Parameter hinzugefügt werden, werden alle Kanäle, die mit dem Befehl SUBSCRIBE abonniert wurden, abgemeldet.

Redis unterstützt auch Nachrichtenabonnements basierend auf Platzhaltern. Verwenden Sie zum Beispiel den Befehl PSUBSCRIBE (Muster-Abonnement):

Versuchen Sie erneut, die Nachricht zu pushen, und Sie erhalten die folgenden Ergebnisse:

Sie können sehen, dass der Veröffentlichungsbefehl 2 zurückgibt. und die Teilnehmerseite hat die Nachricht zweimal empfangen. Dies liegt daran, dass der Befehl PSUBSCRIBE den Kanal wiederholt abonnieren kann. Kanäle, die mit dem Befehl PSUBSCRIBE abonniert wurden, müssen auch mit dem Befehl PUNSUBSCRIBE abgemeldet werden. Mit diesem Befehl können Kanäle, die mit dem Befehl SUBSCRIBE abonniert wurden, nicht abgemeldet werden. Gleichzeitig wird der PUNSUBSCRIBE-Anweisungsplatzhalter nicht erweitert.

Zum Beispiel: PUNSUBSCRIBE * stimmt nicht mit Kanal.* überein. Um sich also von Kanal.* abzumelden, müssen Sie PUBSUBSCRIBE Kanal.* so schreiben.

Die Codedemonstration lautet wie folgt:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息发布方
 * @author yamikaze */public class Publisher { 
    public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis; 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    } 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}

Einfach eine Nachricht senden.

Nachrichtenabonnent:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息订阅方客户端
 * @author yamikaze */public class SubscribeClient { 
    private Jedis jedis;    private static final String EXIT_COMMAND = "exit"; 
    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void subscribe(String ...channel) {        if(channel == null || channel.length <= 0) {            return;
        }        //消息处理,接收到消息时如何处理
        JedisPubSub jps = new JedisPubSub() {            /**
             * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
             * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
             * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
             * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]             */
            @Override            public void onMessage(String channel, String message) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }
 
                }
            } 
            /**
             * 订阅时             */
            @Override            public void onSubscribe(String channel, int subscribedChannels) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("订阅了频道:" + channel);
                }
            }
        };        //可以订阅多个频道 当前线程会阻塞在这儿        jedis.subscribe(jps, channel);
    } 
    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);        //并没有 unsubscribe方法        //相应的也没有punsubscribe方法    }
}

Führen Sie zuerst den Client aus und führen Sie dann Publisher aus, um die Nachricht zu senden. Das Ausgabeergebnis ist:

Der Pub/Sub von Redis hat auch seine Mängel, wenn der Verbraucher geht Offline geht die Nachricht des Produzenten verloren.

Verzögerte WarteschlangeHintergrund

Im Prozess der Geschäftsentwicklung wird es einige Szenarien geben, die eine verzögerte Verarbeitung erfordern, wie zum Beispiel:

a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:

1.Java中java.util.concurrent.DelayQueue

优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化

2.Rocketmq延时队列

优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息

3.Rabbitmq延时队列(TTL+DLX实现)

优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

Redis实现的延时消息队列适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

Redis实现的延时消息队列思路

Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?

试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

Zset的排列效果如下图:

java代码实现如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
 * @program: test
 * @description: redis实现延时队列
 * @author: xingcheng
 * @create: 2018-08-19
 **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();
    }    /**
     * 生产者,生成5个订单     */
    public void productionDelayMessage() {        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();            // 3秒后执行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后执行");
        }
    }    //消费者,取订单
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {
                System.out.println("当前没有等待的任务");                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);
                }
            }
        }
    }    static class DelayMessage implements Runnable{
        @Override        public void run() {            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();        for (int i = 0; i < 10; i++) {            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

实现效果如下:

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung von Redis und Warteschlangen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:cnblogs.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen