Maison  >  Article  >  base de données  >  Comment Redis implémente le blocage de file d'attente, le délai, la publication et l'abonnement

Comment Redis implémente le blocage de file d'attente, le délai, la publication et l'abonnement

WBOY
WBOYavant
2022-05-23 12:15:043235parcourir

Cet article vous apporte des connaissances pertinentes sur Redis, qui présente principalement des problèmes connexes sur la façon de mettre en œuvre le blocage de file d'attente, le retard, la publication et l'abonnement. J'espère qu'il sera utile à tout le monde.

Comment Redis implémente le blocage de file d'attente, le délai, la publication et l'abonnement

Apprentissage recommandé : Tutoriel vidéo Redis

Redis peut non seulement être utilisé comme serveur de cache, mais également comme file d'attente de messages. Son type de liste prend intrinsèquement en charge son utilisation comme file d'attente de messages. Comme le montre la figure ci-dessous :
Comment Redis implémente le blocage de file dattente, le délai, la publication et labonnement

Étant donné que la liste Redis est implémentée à l'aide d'une liste doublement chaînée, le nœud de tête et le nœud de queue sont enregistrés, donc l'insertion ou la récupération d'éléments en tête et en queue de la liste est très rapide, et la complexité temporelle est O(1).

File d'attente ordinaire

Vous pouvez utiliser directement le type de données liste de Redis pour implémenter la file d'attente de messages, avec seulement deux instructions simples : lpush et rpop ou rpush et lpop.

  • lpush+rpop : une file d'attente à gauche et à droite
  • rpush+lpop : une file d'attente à gauche et à droite

Ce qui suit utilise la commande redis pour simuler une file d'attente normale.
Utilisez la commande lpush pour produire des messages :

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"

Utilisez la commande rpop pour consommer des messages :

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"

Ce qui suit utilise du code Java pour implémenter une file d'attente commune.

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer SingleConsumer : </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}

Le code ci-dessus a essentiellement réalisé la production et la consommation de files d'attente ordinaires, mais il y a deux problèmes avec le consommateur de messages dans l'exemple ci-dessus :

  1. Le consommateur a-t-il besoin d'appeler en continu la méthode rpop pour vérifier s'il y a des données (messages) à traiter dans la liste redis. Une connexion sera établie à chaque appel. Il se peut qu'il n'y ait aucune donnée dans la liste, ce qui entraînera un grand nombre de sondages vides et un gaspillage inutile. Vous pouvez peut-être utiliser Thread.sleep() et d'autres méthodes pour permettre au thread consommateur de consommer à nouveau après un certain temps. Si le temps de veille est trop long, certains messages urgents ne peuvent pas être traités. , cela entraînera également des comparaisons sur la connexion.
  2. Si la vitesse du producteur est supérieure à la vitesse de consommation du consommateur, la longueur de la file d'attente des messages continuera d'augmenter, ce qui occupera beaucoup d'espace mémoire au fil du temps.

File d'attente de blocage

Les consommateurs peuvent utiliser la commande brpop pour obtenir des données de la liste Redis. Cette commande ne retournera que s'il n'y a pas d'élément, elle bloquera jusqu'à l'expiration du délai et renverra null, donc le consommateur. n'a pas besoin de dormir pour obtenir les données, cela équivaut à implémenter une file d'attente de blocage,

Utilisez la commande brpop de redis pour simuler la file d'attente de blocage.

>brpop queue:single 30

Vous pouvez voir que la ligne de commande est bloquée dans brpop, et elle reviendra après 30 secondes sans données.

Le code Java est implémenté comme suit :

Le producteur est le même que le producteur de la file d'attente ordinaire.

Consumer BlockConsumer :

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>

Inconvénients : Il est impossible de réaliser plusieurs consommations à la fois.

Mode publication-abonnement

En plus de prendre en charge les files d'attente de messages, Redis fournit également un ensemble de commandes pour prendre en charge le mode publication/abonnement. En utilisant le mode pub/sub de Redis, vous pouvez implémenter une file d'attente qui produit une fois et consomme plusieurs fois.

Publier : L'instruction PUBLISH peut être utilisée pour publier un message, format :

PUBLISH channel message

La valeur de retour indique le nombre d'abonnés au message.

Abonnement : L'instruction SUBSCRIBE est utilisée pour recevoir un message. Le format est :

SUBSCRIBE channel

Après avoir utilisé l'instruction SUBSCRIBE, vous entrez en mode abonnement, mais vous ne recevrez pas le message envoyé par publication avant de vous abonner. ne peut s'abonner qu'avant l'envoi du message. Pour les autres commandes de ce mode, seules les réponses sont visibles.

Les réponses sont divisées en trois types :

  1. S'il s'agit d'un abonnement, la deuxième valeur représente la chaîne à laquelle vous êtes abonné, et la troisième valeur représente le nombre de chaînes souscrites
  2. S'il s'agit d'un message (message), la deuxième valeur Pour le canal qui a généré le message, la troisième valeur est le message
  3. S'il s'agit d'un désabonnement, la deuxième valeur représente le canal désabonné, et la troisième valeur représente le nombre d'abonnements du client actuel.

Ce qui suit utilise la commande redis pour simuler le mode publication-abonnement.

Producteur :

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1

Consumer :

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"

Le code Java est implémenté comme suit :

Producteur PubsubProducer :

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer PubsubConsumer : </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}

Les consommateurs peuvent en démarrer plusieurs, chaque consommateur peut recevoir Toutes les nouvelles.

Vous pouvez utiliser la commande UNSUBSCRIBE pour vous désabonner. Si aucun paramètre n'est ajouté, toutes les chaînes souscrites par la commande SUBSCRIBE seront désabonnées.

Redis prend également en charge l'abonnement aux messages basé sur des caractères génériques, utilisez la commande PSUBSCRIBE (modèle d'abonnement), par exemple :

psubscribe channel.*

Les chaînes souscrites avec la commande PSUBSCRIBE doivent également utiliser la commande PUNSUBSCRIBE pour se désabonner. Cette commande ne peut pas se désabonner des chaînes souscrites par. SUBSCRIBE Même si vous gérez UNSUBSCRIBE, vous ne pouvez pas vous désabonner de la chaîne à laquelle vous avez souscrit par la commande PSUBSCRIBE.

Dans le même temps, les caractères génériques de la commande PUNSUBSCRIBE ne seront pas développés. Par exemple : PUNSUBSCRIBE *不会匹配到channel.*,所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*.

Le pub/sub de Redis a aussi ses inconvénients, c'est-à-dire que si le consommateur se déconnecte, les messages du producteur seront perdus.

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer