Maison  >  Article  >  base de données  >  Une brève analyse de la façon d'utiliser les files d'attente de messages dans Redis

Une brève analyse de la façon d'utiliser les files d'attente de messages dans Redis

青灯夜游
青灯夜游avant
2022-01-05 09:57:352694parcourir

Cet article vous présentera l'utilisation avancée de Redis - file d'attente de messages et présentera la file d'attente différée dans Redis. J'espère qu'il vous sera utile !

Une brève analyse de la façon d'utiliser les files d'attente de messages dans Redis

En parlant de middleware de file d'attente de messages, nous pensons tous à RabbitMQ, RocketMQ et Kafka pour implémenter des fonctions de messagerie asynchrone pour les applications. Il s’agit de middlewares de file d’attente de messages spécialisés avec plus de fonctionnalités que nous ne pouvons en comprendre.

Ces middlewares de messagerie sont compliqués à utiliser, comme RabbitMQ. Avant d'envoyer un message, vous devez créer un échange et une file d'attente, puis lier l'échange et la file d'attente via certaines règles. Lors de l'envoi d'un message, vous devez formuler un routage. . -key, contrôle également le message d’en-tête. Ceci s'adresse uniquement aux producteurs. Les consommateurs doivent également suivre à nouveau la série d'étapes fastidieuses ci-dessus avant de consommer les messages.

Donc, pour ceux qui n'exigent pas une fiabilité à 100 % et souhaitent implémenter des exigences simples en matière de file d'attente de messages, nous pouvons utiliser Redis pour nous libérer des étapes fastidieuses du middleware de file d'attente de messages.

La file d'attente de messages de Redis n'est pas une file d'attente de messages professionnelle. Elle ne dispose pas de nombreuses fonctionnalités avancées dans la file d'attente de messages et n'a pas non plus de garantie d'accusé de réception. Si vous recherchez la fiabilité des messages, veuillez vous tourner vers le middleware MQ professionnel. [Recommandations associées : Tutoriel vidéo Redis]

File d'attente de messages asynchrone

À partir de la file d'attente de messages asynchrone la plus simple, la structure de données de liste de Redis est couramment utilisée comme file d'attente de messages asynchrone et est mise en file d'attente via lrpush/lpush rpop/. lpop pour sortir de la file d'attente.

Une brève analyse de la façon dutiliser les files dattente de messages dans Redis

Problème 1 : File d'attente vide

Pour l'opération pop, lorsque la file d'attente des messages est vide, le client tombera dans une boucle infinie de pop, provoquant de nombreuses interrogations vides qui lui feront perdre la vie, obligeant le client à Le CPU est augmenté, et le QPS de Redis est également augmenté.

La solution au problème ci-dessus consiste à utiliser blpop/brpop de la structure de liste pour retirer la file d'attente, où le préfixe b représente le blocage, le blocage de la lecture. Pour bloquer les lectures, il entrera en état de veille lorsqu'il n'y a aucune donnée dans la file d'attente et se réveillera dès que les données arriveront. Résout parfaitement le problème ci-dessus.

Problème 2 : La connexion inactive est déconnectée

La solution bloquant la lecture semble parfaite, mais elle entraîne immédiatement un autre problème : la connexion inactive. Si le thread continue de se bloquer quelque part, la connexion client Redis devient une connexion inactive. Si le temps d'inactivité est trop long, le serveur Redis se déconnectera activement pour réduire l'occupation des ressources inutilisées. À ce moment, blpop/brpop lèvera une exception.

Nous devons donc être prudents lors de l'écriture des consommateurs de clients (applications), faire attention à la détection des exceptions et réessayer.

Application 1 : Delay Queue

Dans les verrous distribués Redis, il existe généralement trois stratégies pour gérer les échecs de verrouillage :

  • Lancez une exception directement et le frontal rappelle à l'utilisateur s'il doit continuer l'opération ;

  • dormir et réessayer après un certain temps ;

  • Mettre la requête dans la file d'attente différée et réessayer après un certain temps

Quant à la file d'attente différée dans Redis, nous pouvons utiliser le zset (liste ordonnée) ; structure de données à réaliser. Nous sérialisons le message sous forme de chaîne comme valeur de zse et le temps de traitement d'expiration (délai) du message comme score. Interrogez ensuite zset pour obtenir le délai d'expiration du traitement, utilisez zrem pour supprimer la clé de zset afin de représenter une consommation réussie, puis traitez la tâche.

Le code de base est le suivant :

// 生产\
public void delay(T msg) {\
  TaskItem task = new TaskItem();\
  task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
  task.msg = msg;\
  String s = JSON.toJSONString(task); // fastjson 序列化\
  jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\
}\
// 消费\
public void loop() {\
  while (!Thread.interrupted()) {\
   // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\
   Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
   if (values.isEmpty()) {\
     try {\
       Thread.sleep(500); // 歇会继续\
    }\
     catch (InterruptedException e) {\
       break;\
    }\
     continue;\
  }\
   String s = values.iterator().next();  //消费队列\
   if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\
     TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
     this.handleMsg(task.msg);\
  }\
}\
}

Le code ci-dessus est utilisé en multi-threading pour la situation où la même tâche est exécutée par plusieurs threads, bien que la tâche puisse être traitée après zrem pour éviter la situation où une tâche est en cours. consommé plusieurs fois. Mais pour les threads qui ont obtenu la tâche mais n'ont pas réussi à la consommer avec succès, c'était une perte de temps pour obtenir la tâche. Par conséquent, vous pouvez envisager d’optimiser cette logique via les scripts Lua. Déplacer zrangeByScore et zrem ensemble vers le serveur pour les opérations atomiques résoudra parfaitement le problème.

Pour plus de connaissances sur la programmation, veuillez visiter :

Introduction à la programmation ! !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer