Heim >Datenbank >Redis >Redis-Spezialdatentyp-Stream

Redis-Spezialdatentyp-Stream

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBnach vorne
2022-10-11 17:47:363128Durchsuche

Dieser Artikel vermittelt Ihnen relevantes Wissen über Redis, das hauptsächlich den relevanten Inhalt eines speziellen Datentyps Stream vorstellt. Redis bietet eine Fülle von Datentypen, darunter vier spezielle Datentypen: Bitmap, Hyperloglog, Geospatial, Stream, werfen wir einen Blick darauf Bei Stream-bezogenen Problemen hoffe ich, dass es für alle hilfreich sein wird.

Redis-Spezialdatentyp-Stream

Empfohlenes Lernen: Redis-Video-Tutorial

Redis Stream ist ein neu hinzugefügter Datentyp in der Redis 5.0-Version. Redis ist ein Datentyp, der speziell für Nachrichtenwarteschlangen entwickelt wurde.

Bevor Redis 5.0 Stream herauskam, hatte die Implementierung von Nachrichtenwarteschlangen alle ihre eigenen Mängel, wie zum Beispiel:

  • Der Veröffentlichungs- und Abonnementmodus kann nicht beibehalten werden, Nachrichten können nicht zuverlässig gespeichert werden und ist nicht für Offline-Wiederverbindungsclients geeignet Der Fehler, historische Nachrichten nicht lesen zu können;

  • List implementiert die Nachrichtenwarteschlange nicht wiederholt. Eine Nachricht wird gelöscht, nachdem sie verbraucht wurde, und der Produzent muss selbst eine global eindeutige ID implementieren.

Aufgrund der oben genannten Probleme hat Redis 5.0 den Stream-Typ eingeführt, der auch die wichtigste Funktion dieser Version ist. Er wird zur perfekten Implementierung von Nachrichtenwarteschlangen und zur automatischen Generierung global eindeutiger IDs verwendet Bestätigungsmodus für Nachrichten, Unterstützung des Verbrauchergruppenmodus usw., wodurch die Nachrichtenwarteschlange stabiler und zuverlässiger wird.

Allgemeine Befehle

Befehle für den Betrieb der Stream-Nachrichtenwarteschlange:

  • DEL: Den gesamten Stream löschen;
  • # XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
    127.0.0.1:6379> XADD s1 * name sid10t
    "1665047636078-0"
    127.0.0.1:6379> XADD s1 * name sidiot
    "1665047646214-0"
    # XDEL key id [id ...]
    127.0.0.1:6379> XDEL s1 1665047646214-0
    (integer) 1
    # DEL key [key ...]
    127.0.0.1:6379> DEL s1
    (integer) 1

  • XLEN: Nachrichtenlänge abfragen ;

Redis-Spezialdatentyp-Stream

Intervall message; rXtrim: Anzahl der Schnittwarteschlangennachrichten;
  • Rrieee

  • xgroup create: Erstellen Sie eine Verbrauchergruppe; Mit dem Befehl können die „gelesenen, aber noch nicht bestätigten“ Nachrichten aller Verbraucher in jeder Verbrauchergruppe abgefragt werden. Der Befehl „XACK“ wird verwendet, um der Nachrichtenwarteschlange zu bestätigen, dass die Nachrichtenverarbeitung abgeschlossen ist

    Nachrichtenwarteschlange

  • Der Produzent fügt eine Nachricht über den XADD-Befehl ein:

    # XLEN key
    127.0.0.1:6379> XLEN s1
    (integer) 2
    # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    127.0.0.1:6379> XREAD streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
          2) 1) "1665047646214-0"
             2) 1) "name"
                2) "sidiot"
    127.0.0.1:6379> XREAD count 1 streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
        # XADD 了一条消息之后的扩展
        127.0.0.1:6379> XREAD streams s1 1665047636078-0
        1) 1) "s1"
           2) 1) 1) "1665047646214-0"
                 2) 1) "name"
                    2) "sidiot"
              2) 1) "1665053702766-0"
                 2) 1) "age"
                    2) "18"
    # XRANGE key start end [COUNT count]
    127.0.0.1:6379> XRANGE s1 - +
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    3) 1) "1665053702766-0"
       2) 1) "age"
          2) "18"
    127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
    127.0.0.1:6379> XLEN s1
    (integer) 3
    127.0.0.1:6379> XTRIM s1 maxlen 2
    (integer) 1
    127.0.0.1:6379> XLEN s1
    (integer) 2

    Nach erfolgreicher Einfügung wird die global eindeutige ID: „1665058759764-0“ zurückgegeben. Die global eindeutige ID der Nachricht besteht aus zwei Teilen:
  • Der erste Teil „1665058759764“ ist die aktuelle Serverzeit, berechnet in Millisekunden, wenn Daten eingefügt werden;
  • Der zweite Teil stellt die eingefügte Nachricht innerhalb der aktuellen Millisekunde dar Seriennummer, die von 0 an nummeriert ist. Beispielsweise bedeutet „1665058759764-0“ die erste Nachricht innerhalb von „1665058759764“ Millisekunden.

  • Wenn Verbraucher Nachrichten aus der Nachrichtenwarteschlange über den XREAD-Befehl lesen, können sie eine Nachrichten-ID angeben und mit dem Lesen ab der nächsten Nachricht dieser Nachrichten-ID beginnen (beachten Sie, dass das Lesen mit der nächsten Nachricht der eingegebenen Nachrichten-ID beginnt, nicht). eine Nachricht, die die Eingabe-ID abfragt).

    # XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
    # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错;
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
    (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
    # 0-0 从头开始消费,$ 从尾开始消费;
    127.0.0.1:6379> XADD myStream * name sid10t
    "1665057823181-0"
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
    OK
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
    OK
    # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
    127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
    1) 1) "myStream"
       2) 1) 1) "1665058086931-0"
             2) 1) "name"
                2) "sid10t"
          2) 1) "1665058090167-0"
             2) 1) "name"
                2) "sidiot"

    Wenn Sie blockierendes Lesen implementieren möchten (Blockieren, wenn keine Daten vorhanden sind), können Sie beim Aufruf von XRAED das Konfigurationselement BLOCK festlegen, um einen blockierenden Lesevorgang ähnlich wie BRPOP zu implementieren.
  • Zum Beispiel legt der folgende Befehl das Konfigurationselement von BLOCK 10000 fest. Die Einheit von 10000 ist Millisekunden, was bedeutet, dass XREAD, wenn XREAD die neueste Nachricht liest und keine Nachricht eintrifft, für 10000 Millisekunden (d. h. 10 Sekunden) blockiert dann zurück. Die grundlegende Methode von

    # * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
    # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t
    127.0.0.1:6379> XADD mymq * name sid10t
    "1665058759764-0"

    Stream verwendet xadd zum Speichern von Nachrichten und xread zum Blockieren des Lesens von Nachrichten in einer Schleife, um eine einfache Version der Nachrichtenwarteschlange zu implementieren. Der Interaktionsprozess ist wie in der folgenden Abbildung dargestellt:

Diese Operationen werden vorgestellt Frühere Versionen werden ebenfalls unterstützt. Werfen wir einen Blick auf die einzigartigen Funktionen von Stream.

Stream kann XGROUP verwenden, um eine Verbrauchergruppe zu erstellen. Nach dem Erstellen einer Verbrauchergruppe kann Stream den XREADGROUP-Befehl verwenden, um Verbrauchern in der Verbrauchergruppe das Lesen von Nachrichten zu ermöglichen.

Erstellen Sie zwei Verbrauchergruppen, die von diesen beiden Verbrauchergruppen genutzt werden. Beide geben an, mit dem Lesen der ersten Nachricht zu beginnen:

127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
(nil)
127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
Verbraucher1 in der Verbrauchergruppe Gruppe1 Der Nachrichtenbefehl lautet wie folgt:

# 命令最后的 $ 符号表示读取最新的消息
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.01s)

Sobald die Nachricht in der Nachrichtenwarteschlange von einem Verbraucher in der Verbrauchergruppe gelesen wird, kann sie nicht mehr von anderen Verbrauchern in der Verbrauchergruppe gelesen werden, dh Verbraucher in derselben Verbrauchergruppe können nicht konsumiert werden die gleiche Nachricht.

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060632864-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060633903-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060634962-0"
         2) 1) "name"
            2) "sid10t"

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

Redis-Spezialdatentyp-Stream

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 4
2) "1665058759764-0"
3) "1665060634962-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
1) 1) "1665060633903-0"
   2) "consumer2"
   3) (integer) 1888805
   4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。

一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。

127.0.0.1:6379> XACK mymq group2 1665060633903-0
(integer) 1

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
(empty array)

小结

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD

  • 阻塞读取:XREAD block

  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;

  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;

  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不可丢。

  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis-Spezialdatentyp-Stream

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。

  • Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。

  • Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:

Die AOF-Persistenz ist so konfiguriert, dass sie jede Sekunde auf die Festplatte schreibt, aber dieser Schreibvorgang auf die Festplatte ist asynchron und es besteht die Möglichkeit eines Datenverlusts, wenn Redis ausfällt.

Die Master-Slave-Replikation ist ebenfalls asynchron Datenverlust bei Master-Slave-Umschaltung (öffnet neues Fenster).

Wie Sie sehen, kann Redis nicht garantieren, dass Nachrichten nicht in der Warteschlangen-Middleware-Verbindung verloren gehen. Professionelle Warteschlangen-Middleware wie RabbitMQ oder Kafka wird zum Bereitstellen eines Clusters verwendet. Wenn ein Produzent eine Nachricht veröffentlicht, schreibt die Warteschlangen-Middleware normalerweise „mehrere Knoten“, d. h. es gibt mehrere Kopien. Auch wenn einer der Knoten ausfällt , kann garantiert werden, dass die Daten im Cluster nicht verloren gehen.

2. Können Redis Stream-Nachrichten gesammelt werden?

Redis-Daten werden im Speicher gespeichert, was bedeutet, dass der Redis-Speicher weiter wächst, wenn er das Maschinenspeicherlimit überschreitet, und es zu einem OOM-Risiko kommt.

Der Stream von Redis bietet also die Funktion, die maximale Länge der Warteschlange anzugeben, um diese Situation zu vermeiden.

Wenn Sie die maximale Länge der Warteschlange angeben, werden alte Nachrichten gelöscht und nur neue Nachrichten mit fester Länge beibehalten, nachdem die Warteschlangenlänge die Obergrenze überschreitet. Unter diesem Gesichtspunkt können Nachrichten immer noch verloren gehen, wenn Stream eine maximale Länge hat, die beim Rückstau von Nachrichten angegeben wird.

Aber die Daten professioneller Nachrichtenwarteschlangen wie Kafka und RabbitMQ werden auf der Festplatte gespeichert. Wenn Nachrichten im Rückstand sind, beanspruchen sie einfach mehr Speicherplatz.

Wenn Sie Redis als Warteschlange verwenden, treten daher zwei Probleme auf:

  • Redis selbst kann Daten verlieren;

  • Angesichts der Nachrichtenknappheit werden die Speicherressourcen knapp. Ob Sie Redis als Nachrichtenwarteschlange verwenden können, hängt von Ihrem Geschäftsszenario ab:

Wenn Ihr Geschäftsszenario einfach genug ist, nicht anfällig für Datenverluste und die Wahrscheinlichkeit eines Nachrichtenrückstands relativ gering ist, verwenden Sie Redis als Nachrichtenwarteschlange. Es ist durchaus möglich, eine Warteschlange zu erstellen.

  • Wenn Ihr Unternehmen über eine große Menge an Nachrichten verfügt, die Wahrscheinlichkeit eines Nachrichtenrückstands relativ hoch ist und ein Datenverlust nicht akzeptiert werden kann, ist es besser, professionelle Middleware für Nachrichtenwarteschlangen zu verwenden.

  • Ergänzung: Warum kann der Redis-Publish/Subscribe-Mechanismus nicht als Nachrichtenwarteschlange verwendet werden?

  • Der Publish-Subscribe-Mechanismus weist die folgenden Mängel auf, die alle mit verlorenen Daten zusammenhängen:

Der Publish/Subscribe-Mechanismus ist nicht basierend auf einem Datentyp implementiert und verfügt daher nicht über die Fähigkeit zur „Datenpersistenz“. hängt mit dem Publish/Subscribe-Mechanismus zusammen. Wenn Redis abstürzt und neu startet, gehen alle Daten im Publish/Subscribe-Mechanismus verloren.

  • Das Veröffentlichungs- und Abonnementmodell ist ein „Senden und vergessen“-Arbeitsmodus. Wenn ein Abonnent offline geht und sich erneut verbindet, kann er frühere historische Nachrichten nicht konsumieren.

  • Wenn der Verbraucher einen bestimmten Rückstand an Nachrichten hat, dh die vom Produzenten gesendeten Nachrichten, und der Verbraucher diese nicht konsumieren kann, wird der Verbraucher zwangsweise getrennt, wenn dieser 32 Millionen überschreitet oder über 8 Millionen bleibt Der Parameter wird in der Konfigurationsdatei festgelegt und der Standardwert ist client-output-buffer-limit pubsub 32 MB 8 MB 60.

  • Daher ist der Publish/Subscribe-Mechanismus nur für Sofortkommunikationsszenarien geeignet. Beispielsweise verwendet das Szenario des Aufbaus eines Sentinel-Clusters (öffnet neues Fenster) den Publish/Subscribe-Mechanismus.

  • Empfohlenes Lernen:
Redis-Video-Tutorial

Das obige ist der detaillierte Inhalt vonRedis-Spezialdatentyp-Stream. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:juejin.im. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen