Maison > Article > base de données > Flux de types de données spéciaux Redis
Cet article vous apporte des connaissances pertinentes sur Redis, qui présente principalement le contenu pertinent d'un flux de types de données spéciaux. Redis fournit une multitude de types de données, dont quatre spéciaux : bitmap, hyperloglog, géospatial, flux, jetons un coup d'œil. sur les problèmes liés au flux, j'espère que cela sera utile à tout le monde.
Apprentissage recommandé : Tutoriel vidéo Redis
Redis Stream est un type de données nouvellement ajouté dans la version Redis 5.0. Redis est un type de données spécialement conçu pour les files d'attente de messages.
Avant la sortie de Redis 5.0 Stream, la mise en œuvre des files d'attente de messages présentait toutes ses propres lacunes, telles que :
Le mode de publication et d'abonnement, qui ne peut pas être conservé et ne peut pas enregistrer les messages de manière fiable, et ne convient pas aux clients de reconnexion hors ligne. Le défaut de ne pas pouvoir lire les messages historiques ;
La manière dont List implémente la file d'attente des messages ne peut pas être consommée à plusieurs reprises. Un message sera supprimé une fois consommé, et le producteur doit implémenter lui-même un identifiant globalement unique.
Sur la base des problèmes ci-dessus, Redis 5.0 a lancé le type Stream, qui est également la fonctionnalité la plus importante de cette version. Il est utilisé pour implémenter parfaitement les files d'attente de messages, il prend en charge la persistance des messages, la génération automatique d'identifiants globalement uniques et. Mode de confirmation des messages, prend en charge le mode groupe de consommateurs, etc., rendant la file d'attente des messages plus stable et fiable.
Commandes d'opération de file d'attente de messages de flux :
DEL : supprimer l'intégralité du flux
# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...] 127.0.0.1:6379> XADD s1 * name sid10t "1665047636078-0" 127.0.0.1:6379> XADD s1 * name sidiot "1665047646214-0" # XDEL key id [id ...] 127.0.0.1:6379> XDEL s1 1665047646214-0 (integer) 1 # DEL key [key ...] 127.0.0.1:6379> DEL s1 (integer) 1
Rrieee
# XLEN key 127.0.0.1:6379> XLEN s1 (integer) 2 # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREAD streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 127.0.0.1:6379> XREAD count 1 streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" # XADD 了一条消息之后的扩展 127.0.0.1:6379> XREAD streams s1 1665047636078-0 1) 1) "s1" 2) 1) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 2) 1) "1665053702766-0" 2) 1) "age" 2) "18" # XRANGE key start end [COUNT count] 127.0.0.1:6379> XRANGE s1 - + 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 3) 1) "1665053702766-0" 2) 1) "age" 2) "18" 127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count] 127.0.0.1:6379> XLEN s1 (integer) 3 127.0.0.1:6379> XTRIM s1 maxlen 2 (integer) 1 127.0.0.1:6379> XLEN s1 (integer) 2
# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read] # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错; 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. # 0-0 从头开始消费,$ 从尾开始消费; 127.0.0.1:6379> XADD myStream * name sid10t "1665057823181-0" 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 OK 127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $ OK # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream > 1) 1) "myStream" 2) 1) 1) "1665058086931-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665058090167-0" 2) 1) "name" 2) "sidiot"Après une insertion réussie, l'ID globalement unique : "1665058759764-0" sera renvoyé. L'ID globalement unique du message se compose de deux parties :
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t 127.0.0.1:6379> XADD mymq * name sid10t "1665058759764-0"Si vous souhaitez implémenter un blocage de lecture (blocage lorsqu'il n'y a pas de données), vous pouvez définir l'élément de configuration BLOCK lors de l'appel de XRAED pour implémenter une opération de blocage de lecture similaire à BRPOP. Par exemple, la commande suivante définit l'élément de configuration du BLOC 10000. L'unité de 10000 est la milliseconde, ce qui indique que lorsque XREAD lit le dernier message, si aucun message n'arrive, XREAD bloquera pendant 10000 millisecondes (soit 10 secondes) et puis reviens. La méthode de base de
127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0 (nil) 127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"Stream utilise xadd pour stocker les messages et xread pour bloquer la lecture des messages en boucle afin d'implémenter une version simple de la file d'attente de messages. Le processus d'interaction est le suivant :
Ces opérations Liste introduite. précédents sont également pris en charge, jetons un coup d'œil aux fonctions uniques de Stream.
Créez deux groupes de consommateurs. La file d'attente de messages consommée par ces deux groupes de consommateurs est mymq. Ils spécifient tous deux de commencer la lecture à partir du premier message :
# 命令最后的 $ 符号表示读取最新的消息 127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $ (nil) (10.01s)
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。 127.0.0.1:6379> XGROUP CREATE mymq group1 0-0 OK # 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。 127.0.0.1:6379> XGROUP CREATE mymq group2 0-0 OK
Une fois que le message dans la file d'attente des messages est lu par un consommateur du groupe de consommateurs, il ne peut plus être lu par les autres consommateurs du groupe de consommateurs, c'est-à-dire que les consommateurs du même groupe de consommateurs ne peuvent pas consommer le même message.
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq > (nil)
但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。
比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060632864-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060633903-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060634962-0" 2) 1) "name" 2) "sid10t"
基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:
如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:
127.0.0.1:6379> XPENDING mymq group2 1) (integer) 4 2) "1665058759764-0" 3) "1665060634962-0" 4) 1) 1) "consumer1" 2) "2" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"
如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息 127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 1) 1) "1665060633903-0" 2) "consumer2" 3) (integer) 1888805 4) (integer) 1
可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。
一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。
127.0.0.1:6379> XACK mymq group2 1665060633903-0 (integer) 1
当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 (empty array)
好了,基于 Stream 实现的消息队列就说到这里了,小结一下:
消息保序:XADD/XREAD
阻塞读取:XREAD block
重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
支持消费组形式消费数据
Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?
一个专业的消息队列,必须要做到两大块:
消息不可丢。
消息可堆积。
1、Redis Stream 消息会丢失吗?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
Redis Stream 消息队列能不能保证三个环节都不丢失数据?
Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:
La persistance AOF est configurée pour écrire sur le disque toutes les secondes, mais ce processus d'écriture sur disque est asynchrone, et il existe une possibilité de perte de données lorsque Redis tombe en panne
La réplication maître-esclave est également asynchrone, et il y a aussi ; perte lorsque le maître-esclave change. Possibilités de données (ouvre une nouvelle fenêtre).
Comme vous pouvez le constater, Redis ne peut pas garantir que les messages ne seront pas perdus dans le lien middleware de la file d'attente. Un middleware de file d'attente professionnel tel que RabbitMQ ou Kafka est utilisé pour déployer un cluster. Lorsqu'un producteur publie un message, le middleware de file d'attente écrit généralement « plusieurs nœuds », c'est-à-dire qu'il y a plusieurs copies de cette manière, même si l'un des nœuds. échoue, les données du cluster ne seront pas perdues.
2. Les messages Redis Stream peuvent-ils être accumulés ?
Les données Redis sont stockées en mémoire, ce qui signifie qu'une fois qu'un retard de messages se produit, la mémoire Redis continuera de croître si elle dépasse la limite de mémoire de la machine, elle sera confrontée au risque de MOO.
Donc Redis' Stream fournit la fonction de spécifier la longueur maximale de la file d'attente, histoire d'éviter cette situation.
Lors de la spécification de la longueur maximale de la file d'attente, une fois que la longueur de la file d'attente dépasse la limite supérieure, les anciens messages seront supprimés et seuls les nouveaux messages de longueur fixe seront conservés. De ce point de vue, si Stream a une longueur maximale spécifiée lorsque les messages sont en retard, les messages peuvent toujours être perdus.
Mais les données des files d'attente de messages professionnelles telles que Kafka et RabbitMQ sont stockées sur le disque. Lorsque les messages sont en retard, ils occupent simplement plus d'espace disque.
Par conséquent, lorsque vous utilisez Redis comme file d'attente, vous serez confronté à deux problèmes :
Redis lui-même peut perdre des données
Face à la compression des messages, les ressources mémoire seront limitées ; La possibilité d'utiliser Redis comme file d'attente de messages dépend de votre scénario commercial :
Si votre scénario commercial est assez simple, peu sensible à la perte de données et que la probabilité d'un retard de messages est relativement faible, utilisez Redis comme file d'attente de messages. Il est tout à fait possible de créer une file d'attente.
Si votre entreprise a une grande quantité de messages, la probabilité d'un retard de messages est relativement élevée et la perte de données ne peut pas être acceptée, il est alors préférable d'utiliser un middleware de file d'attente de messages professionnel.
Supplément : Pourquoi le mécanisme de publication/abonnement Redis ne peut-il pas être utilisé comme file d'attente de messages ?
Le mécanisme de publication/abonnement n'est implémenté sur la base d'aucun type de données, il n'a donc pas la capacité de "persistance des données", ce qui est lié au mécanisme de publication/abonnement. Les opérations ne seront pas écrites dans RDB et AOF. Lorsque Redis plante et redémarre, toutes les données du mécanisme de publication/abonnement seront perdues.
Le modèle de publication et d'abonnement est un mode de travail "envoyer et oublier". Si un abonné se déconnecte et se reconnecte, il ne peut pas consommer les messages historiques précédents.
Lorsque le consommateur a un certain retard de messages, c'est-à-dire les messages envoyés par le producteur, et que le consommateur ne peut pas les consommer, s'il dépasse 32M ou reste au-dessus de 8M dans les 60s, le consommateur sera déconnecté de force. Le paramètre est défini dans le fichier de configuration et la valeur par défaut est client-output-buffer-limit pubsub 32mb 8mb 60.
Par conséquent, le mécanisme de publication/abonnement ne convient qu'aux scénarios de communication instantanée. Par exemple, le scénario de création d'un cluster sentinelle (ouvre une nouvelle fenêtre) utilise le mécanisme de publication/abonnement.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!