Die Kolumne
Java Basic Tutorial stellt heute das Wissen über RocketMQ ausführlich vor.
Es ist schon lange her, seit ich gebloggt habe. Obwohl ich unzählige Gründe finden kann, warum ich nicht blogge, ist es letzten Endes immer noch „faul“. Heute habe ich endlich eine Pille gegen faulen Krebs genommen und beschlossen, einen Blog zu schreiben. Was soll ich vorstellen? Nach langem Überlegen möchte ich RocketMQ vorstellen. Immerhin habe ich mehr als 30 Blogs geschrieben, aber ich habe noch keinen guten Blog über MQ geschrieben. Dieser Blog ist relativ einfach und beinhaltet keine Quellcode-Analyse, sondern nur Lese- und Schreibkenntnisse.
Ich denke, aus einer bestimmten Perspektive haben Microservices die starke Entwicklung von MQ vorangetrieben. Ursprünglich hatte ein System N mehrere Module und alle Module waren stark miteinander gekoppelt. Ein Modul ist ein System, und Systeme müssen unbedingt interagieren. Es gibt drei gängige Interaktionsmethoden: eine ist RPC, eine ist HTTP und die andere ist MQ.
Ursprünglich ist ein Geschäft in N Schritte unterteilt, die Schritt für Schritt verarbeitet werden müssen, bevor das Endergebnis an den Benutzer zurückgegeben werden kann. Bei MQ wird nun zuerst der kritischste Teil verarbeitet, und dann wird die Nachricht verarbeitet An MQ gesendet und direkt an OK zurückgegeben, Benutzer. Lassen Sie uns die folgenden Schritte langsam im Hintergrund verarbeiten. Dies ist wirklich ein Artefakt, um die Benutzererfahrung zu verbessern.
Ein plötzlicher Anstieg der Anzahl der Anfragen für eine bestimmte Schnittstelle wird zwangsläufig großen Druck auf den Anwendungsserver und den Datenbankserver ausüben. Mit MQ müssen Sie sich jetzt keine Gedanken darüber machen, wie viele Anfragen eingehen. und sie werden langsam im Hintergrund verarbeitet.
RocketMQ ist in Java geschrieben. Es handelt sich um Alibabas Open-Source-Nachrichten-Middleware, die viele der Vorteile von Kafka nutzt. Kafka ist ebenfalls eine beliebte Nachrichten-Middleware, aber Kafka ist in Scala geschrieben, was für Java-Programmierer nicht förderlich ist, den Quellcode zu lesen, und es ist für Java-Programmierer auch nicht förderlich, benutzerdefinierte Entwicklungen durchzuführen. Freunde, die Kafka kennengelernt haben, wissen, dass es nicht einfach ist, Kafka gut zu nutzen. Relativ gesehen ist RocketMQ viel einfacher, und RocketMQ wurde von Alibaba gesegnet und hat den Test von N Double 11 erlebt. Es ist besser für inländische Internetunternehmen geeignet Daher wird es im Inland von vielen RocketMQ-Unternehmen verwendet.
Bilder von gitee.com/mirrors/roc...
Sie können sehen, dass RocketMQ vier Hauptkomponenten hat:
Der Produzent initiiert in regelmäßigen Abständen eine Abfrage von Topic-Routing-Informationen an NameServer.
Consumer initiiert in regelmäßigen Abständen eine Abfrage von Topic-Routing-Informationen an NameServer.
Tatsächlich wurde Zookeeper in der unteren Version von RocketMQ tatsächlich als Registrierungscenter verwendet, aber später wurde es auf den aktuellen NameServer geändert. Ich denke, der Hauptgrund ist:
ist in ProducerGroup und ConsumerGroup unterteilt. Wir legen mehr Wert darauf, dass ConsumerGroup mehrere Verbraucher enthält.
Im Cluster-Verbrauchsmodus verbrauchen Verbraucher unter einer Verbrauchergruppe gemeinsam ein Thema, und jeder Verbraucher wird N Warteschlangen zugewiesen, aber eine Warteschlange wird nur von einem Verbraucher konsumiert. Verschiedene Verbrauchergruppen können dasselbe Thema konsumieren wird von allen ConsumerGroups verbraucht, die dieses Thema abonniert haben.
Es gibt zwei Verbrauchsmodi: Clustering (Cluster-Verbrauch) und Broadcasting (Broadcast-Verbrauch).
Im Gegensatz zu anderen MQs, die beim Senden von Nachrichten den Cluster-Verbrauch oder den Broadcast-Verbrauch angeben, legt RocketMQ den Cluster-Verbrauch oder den Broadcast-Verbrauch auf der Verbraucherseite fest.
Der Standardwert ist der Clusterverbrauchsmodus. In diesem Modus konsumieren alle Verbraucher in der ConsumerGroup gemeinsam Nachrichten aus einem Thema. Jeder Verbraucher ist für den Konsum von Nachrichten aus N Warteschlangen verantwortlich (N kann auch 1 sein). (even ist 0, nicht der Warteschlange zugewiesen), aber eine Warteschlange wird nur von einem Verbraucher genutzt. Wenn ein Verbraucher stirbt, übernehmen andere Verbraucher der ConsumerGroup die Leitung und konsumieren weiter.
Im Cluster-Verbrauchsmodus wird der Verbrauchsfortschritt auf der Borker-Seite beibehalten und der Speicherpfad ist ${ROCKET_HOME}/store/config/ consumerOffset.json
,如下图所示:使用topicName@consumerGroupName
为Key,消费进度为Value,Value的形式是queueId:offset
Dies bedeutet, dass bei mehreren Verbrauchergruppen der Verbrauchsfortschritt jeder Verbrauchergruppe unterschiedlich ist und separat gespeichert werden muss.
Broadcast-Verbrauchsnachrichten werden an alle Verbraucher in der ConsumerGroup gesendet.
Im Broadcast-Verbrauchsmodus wird der Verbrauchsfortschritt auf der Verbraucherseite beibehalten.
Wir wissen, dass im Cluster-Verbrauchsmodus alle Verbraucher unter der Verbrauchergruppe gemeinsam Nachrichten aus einem Thema konsumieren und jeder Verbraucher für den Konsum von Nachrichten aus N Warteschlangen verantwortlich ist. Wie erfolgt also die Zuordnung? Dies beinhaltet den Algorithmus zum Laden der Verbrauchswarteschlange.
RocketMQ bietet zahlreiche Algorithmen zum Laden von Verbrauchswarteschlangen, von denen die beiden am häufigsten verwendeten Algorithmen AllocateMessageQueueAveragely und AllocateMessageQueueAveragelyByCircle sind. Werfen wir einen Blick auf die Unterschiede zwischen diesen beiden Algorithmen.
Angenommen, ein Thema hat jetzt 16 Warteschlangen, dargestellt durch q0~q15, und 3 Verbraucher, dargestellt durch c0-c2.
Die Ergebnisse der Verwendung von AllocateMessageQueueAveragely zur Nutzung des Warteschlangenladealgorithmus sind wie folgt:
Verwenden AllocatemessagequeueaveragelyByCircle zum Konsum von Warteschlangenlastalgorithmus Die Ergebnisse sind wie folgt:
Alle Verbraucher unter dem Verbraucherkonsum konsumiert Bei einer Topic-Nachricht ist jeder Verbraucher dafür verantwortlich, Nachrichten aus N Warteschlangen zu konsumieren, aber eine Warteschlange kann nicht gleichzeitig von N Verbrauchern konsumiert werden. Was bedeutet das?
Wenn Sie schlau sind, müssen Sie gedacht haben, dass, wenn ein Thema nur 4 Warteschlangen und 5 Verbraucher hat, kein Verbraucher einer Warteschlange zugewiesen wird. Daher bestimmt in RocketMQ direkt die Anzahl der Warteschlangen unter dem Thema Die maximale Anzahl an Verbrauchern bedeutet, dass Sie die Verbrauchsgeschwindigkeit nicht einfach durch das Hinzufügen weiterer Verbraucher erhöhen können.
Obwohl empfohlen wird, die Anzahl der Warteschlangen beim Erstellen eines Themas vollständig zu berücksichtigen, ist die tatsächliche Situation oft unbefriedigend. Selbst wenn sich die Anzahl der Warteschlangen nicht ändert, wird sich die Anzahl der Verbraucher definitiv ändern, z Online und offline legt beispielsweise ein Consumer auf oder ein neuer Consumer kommt hinzu. Die Erweiterung und Kontraktion der Warteschlange sowie die Erweiterung und Kontraktion des Verbrauchers führen zu einer Neuausrichtung, dh die Verbrauchswarteschlange wird an den Verbraucher neu verteilt.
In RocketMQ fragt der Verbraucher regelmäßig die Anzahl der Themenwarteschlangen ab. Wenn sich die Anzahl der Verbraucher ändert, wird eine Neuverteilung ausgelöst.
Die Neuausrichtung wird intern von RocketMQ implementiert und Programmierer müssen sich nicht darum kümmern.
Im Allgemeinen verfügt MQ über zwei Methoden zum Abrufen von Nachrichten:
Ob Pull oder Push, der Verbraucher interagiert immer mit dem Broker. Die Interaktionsmethoden umfassen im Allgemeinen kurze Verbindungen, lange Verbindungen und Abfragen.
Es scheint, dass RocketMQ sowohl Pull als auch Push unterstützt, aber tatsächlich wird Push auch mithilfe von Pull implementiert. Wie interagiert der Verbraucher also mit dem Broker?
Das ist die Genialität des RocketMQ-Designs. Es handelt sich weder um eine kurze Verbindung noch um eine lange Verbindung, noch um eine Abfrage, sondern um eine lange Abfrage.
Der Verbraucher initiiert eine Anforderung zum Abrufen von Nachrichten, die in zwei Situationen unterteilt ist:
RocketMQ unterstützt Transaktionsnachrichten. Nachdem der Produzent die Transaktionsnachricht an den Broker gesendet hat, speichert der Broker die Nachricht im Systemthema: RMQ_SYS_TRANS_HALF_TOPIC
, sodass der Verbraucher diese nicht konsumieren kann Nachricht. RMQ_SYS_TRANS_HALF_TOPIC
,这样Consumer就无法消费到这条消息了。
Broker会有一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC
的消息,向Producer发起回查,回查的状态有三种:提交、回滚、未知。
延迟消息是指息发到Broker后,不能立刻被Consumer消费,需要等待一定的时间才可以被消费到,RocketMQ只支持特定的延迟时间:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RMQ_SYS_TRANS_HALF_TOPIC
-Nachrichten zu verarbeiten und eine Überprüfung an den Produzenten zu initiieren. Es gibt drei Status der Überprüfung: eingereicht, zurückgesetzt und unbekannt.
Wenn der Status der Überprüfung „Übermittlung“ oder „Rollback“ lautet, wird die Übermittlung und das Rollback der Nachricht ausgelöst.
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
. Der Produzent sendet Nachrichten an Borker, um die Nachricht beizubehalten. RocketMQ unterstützt zwei Persistenzstrategien:
Synchronische Replikation, asynchrone Replikation
In der tatsächlichen Entwicklung wird im Allgemeinen die Synchronisierungsmethode verwendet. Wenn Sie die Leistung von RocketMQ verbessern möchten, ändern Sie normalerweise die Parameter auf der Borker-Seite, insbesondere die Disk-Brushing-Strategie und die Replikationsstrategie.
Wenn beim Senden einer Nachricht MessageQueueSelector verwendet wird, ist der Wiederholungsmechanismus des Nachrichtenversands ungültig.
Die Antwort auf das Senden einer Nachricht kann die folgenden vier sein:
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE, }复制代码
Außer der ersten sind die anderen Situationen problematisch. Um sicherzustellen, dass die Nachricht nicht verloren geht, müssen Sie den Producer-Parameter festlegen: RetryAnotherBrokerWhenNotStoreOK
auf true. RetryAnotherBrokerWhenNotStoreOK
为true。
如果消息发送失败了,重试的时候,还是发送给这个Borker,那么大概率发送还是失败的,RockteMQ设计精巧之处在于,重试的时候,会自动避开这个Borker,而选择其他Borker,但是目前为止,异步发送没有那么智能,只会在一个Borker上重试,所以强烈建议选择同步发送方式。
RocketMQ提供了两种故障规避机制。用参数SendLatencyFaultEnable
来控制。
延迟退避机制看起来很好用,但是一般来说Borker端繁忙,导致Borker不可用或者网络不可用只是一瞬间的事情,马上就可以恢复,如果开启了延迟退避机制,本来可用的Borker在一段时间内却被规避了,其他Borker更加繁忙,那可能情况更糟糕。
Consumer有两个参数,可以消费的并行度,即ConsumeThreadMin
、ConsumeThreadMax
,看起来给人的感觉是,如果Consumer端堆积消息比较少,消费线程数为ConsumeThreadMin
;如果Consumer端堆积消息比较多,就自动开启新的线程来消费,直到消费线程数为ConsumeThreadMax
。但是并不是这样,Consumer内部持有一个线程池,选用的是无界队列,也就是ConsumeThreadMax
参数是无效的,所以在实际开发中,ConsumeThreadMin
、ConsumeThreadMax
往往设置成一样。
如果查询不到消费进度的时候,Consumer从哪里开始消费,RocketMQ支持从最新消息、最早消息、指定时间戳这三种方式进行消费。
RocketMQ会为每个ConsumerGroup都设置一个Topic名称为%RETRY%+consumerGroup
的重试队列,用来保存需要给ConsumerGroup重试的消息,但是重试需要一定的延时时间,RocketMQ对于重试消息的处理是先保存至Topic名称为SCHEDULE_TOPIC_XXXX
的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%+consumerGroup
SendLatencyFaultEnable
. ConsumeThreadMin
, ConsumeThreadMax
, es scheint, dass, wenn auf der Verbraucherseite relativ wenige Nachrichten angesammelt werden, die Anzahl der Verbraucherthreads ConsumeThreadMin
ist; Auf der Verbraucherseite werden weniger Nachrichten gesammelt. Wenn viele Nachrichten vorhanden sind, wird automatisch ein neuer Thread für den Konsum geöffnet, bis die Anzahl der konsumierenden Threads ConsumeThreadMax
erreicht. Dies ist jedoch nicht der Fall. Der Verbraucher verfügt intern über einen Thread-Pool und verwendet eine unbegrenzte Warteschlange. Das heißt, der Parameter ConsumeThreadMax
ist ungültig, sodass in der tatsächlichen Entwicklung ConsumeThreadMin
gilt. ConsumeThreadMax wird oft auf den gleichen Wert gesetzt. Wenn der Verbrauchsfortschritt nicht abgefragt werden kann, wo beginnt der Verbraucher mit dem Konsum? RocketMQ unterstützt den Verbrauch ab der neuesten Nachricht, der frühesten Nachricht und dem angegebenen Zeitstempel.VerbrauchernachrichtenwiederholungRocketMQ richtet für jede Verbrauchergruppe, die Nachrichten speichert, eine Wiederholungswarteschlange mit dem Themennamen
%RETRY%+consumerGroup
ein von ConsumerGroup erneut versucht werden, aber für die Wiederholung ist eine bestimmte Verzögerungszeit erforderlich. RocketMQ verarbeitet Wiederholungsnachrichten zunächst in der Verzögerungswarteschlange mit dem ThemennamenSCHEDULE_TOPIC_XXXX
. Die im Hintergrund geplanten Aufgaben folgen der entsprechenden Zeitverzögerung Speichern Sie es dann erneut in der Wiederholungswarteschlange von%RETRY%+consumerGroup
. Was soll ich tun, wenn sich Nachrichten ansammeln und die Verbrauchskapazität nicht ausreicht? Die Verbesserung des Verbrauchsfortschritts ist der beste Weg. 🎜🎜Warteschlange hinzufügen und Verbraucher hinzufügen. 🎜🎜Der ursprüngliche Verbraucher fungiert als Brick-Mover und „verschiebt“ Nachrichten gemäß bestimmten Regeln in mehrere neue Themen und öffnet dann mehrere Verbrauchergruppen, um verschiedene Themen zu konsumieren. 🎜🎜Eröffnen Sie eine neue ConsumerGroup für den Konsum, das heißt, zwei ConsumerGroups konsumieren gleichzeitig ein Thema, aber Sie müssen auf die Beurteilung des Offsets achten. Beispielsweise konsumiert eine ConsumerGroup Nachrichten mit einer ungeraden Zahl und eine ConsumerGroup konsumiert Nachrichten mit einer geraden Nummer. 🎜🎜🎜Ich dachte ursprünglich, dass ich reibungslos schreiben könnte, wenn ich Alphabetisierungstext schreibe, aber ich habe es trotzdem überlegt. Da es sich um Alphabetisierungstext handelt, richtet er sich an Freunde, die nicht viel Kontakt mit RocketMQ hatten, RocketMQ aber schon Einfach, dass es unmöglich ist, es mit nur einer Person zu verwenden? Dieser Blog ermöglicht Freunden, die noch nicht viel Kontakt mit RocketMQ hatten, einen reibungslosen Einstieg. Als ich den Blog schrieb, habe ich darüber nachgedacht, ob das wichtig ist und ob das so sein muss sorgfältig beschrieben werden? Diese Sache kann ignoriert werden, kann sie nicht eingeführt werden usw. Es ist ersichtlich, dass in diesem Artikel grundsätzlich verschiedene Konzepte vorgestellt werden und die API-Ebene fast nicht berücksichtigt wird, da sie geschätzt wird, sobald die API beteiligt ist dass es nicht in zwei Wochen fertig sein wird. 🎜🎜Ende🎜🎜🎜🎜Verwandte kostenlose Lernempfehlungen: 🎜🎜🎜Java-Grundlagen-Tutorial🎜🎜🎜
Das obige ist der detaillierte Inhalt vonEndlich hier...RocketMQ Literacy-Kapitel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!