La colonne
Tutoriel de base Java présente les connaissances sur RocketMQ en détail aujourd'hui.
Cela fait longtemps que je n'ai pas blogué. Même si je peux trouver d'innombrables raisons pour ne pas bloguer, en dernière analyse, je suis toujours « paresseux ». Aujourd'hui, j'ai finalement pris une pilule pour guérir un cancer paresseux et j'ai décidé d'écrire un blog. Que dois-je présenter ? Après mûre réflexion, j'aimerais présenter RocketMQ. Après tout, j'ai écrit plus de 30 blogs, mais je n'ai pas encore écrit un bon blog sur MQ. Ce blog est relativement basique et n'implique pas d'analyse du code source, juste de l'alphabétisation.
Je pense que d'un certain point de vue, les microservices ont favorisé le développement vigoureux de MQ À l'origine, un système avait N plusieurs modules, tous les modules sont fortement couplés entre eux. Désormais, avec les microservices, un module est un système et les systèmes doivent absolument interagir. Il existe trois méthodes d'interaction courantes, l'une est RPC, l'autre HTTP et l'autre MQ.
À l'origine, une entreprise était divisée en N étapes, qui devaient être traitées étape par étape avant que le résultat final puisse être rendu à l'utilisateur. Désormais avec MQ, la partie la plus critique. est traité en premier, puis envoie un message à MQ et renvoie directement OK à l'utilisateur. Quant aux étapes suivantes, traitons-les lentement en arrière-plan. C'est vraiment un artefact pour améliorer l'expérience utilisateur.
Une augmentation soudaine du nombre de requêtes pour une certaine interface mettra inévitablement beaucoup de pression sur le serveur d'applications et le serveur de base de données. Désormais, avec MQ, vous n'en avez plus. de vous soucier d'un certain nombre de demandes. Elles seront traitées lentement en arrière-plan.
RocketMQ est écrit en Java. Il s'agit du middleware de messages open source d'Alibaba et absorbe de nombreux avantages de Kafka. Kafka est également un middleware de messages populaire, mais Kafka est écrit en Scala, ce qui ne permet pas aux programmeurs Java de lire le code source, et il n'est pas non plus propice aux programmeurs Java pour effectuer un développement personnalisé. Les amis qui ont été exposés à Kafka savent qu'il n'est pas facile d'utiliser Kafka. Relativement parlant, RocketMQ est beaucoup plus simple, et RocketMQ est béni par Alibaba et a expérimenté le test de N Double 11. Il est plus adapté aux sociétés Internet nationales. , il est donc utilisé au niveau national. Il existe de nombreuses sociétés RocketMQ.
Photos de gitee.com/mirrors/roc…
Vous pouvez voir que RocketMQ a quatre composants principaux :
Le producteur lance une requête d'informations de routage de sujet vers NameServer à intervalles réguliers.
Le consommateur lance une requête d'informations de routage de sujet vers NameServer à intervalles réguliers.
En fait, dans la version inférieure de RocketMQ, Zookeeper était effectivement utilisé comme centre d'enregistrement, mais il a ensuite été remplacé par l'actuel NameServer I. je suppose que la raison principale est :
est divisé en ProducerGroup et ConsumerGroup. Nous accordons plus d'attention à ConsumerGroup.ConsumerGroup contient plusieurs consommateurs.
En mode de consommation de cluster, les consommateurs d'un ConsumerGroup consomment un sujet ensemble, et chaque consommateur sera affecté à N files d'attente, mais une file d'attente ne sera consommée que par un seul consommateur. Différents groupes de consommateurs peuvent consommer le même sujet A et. un message sera consommé par tous les ConsumerGroups abonnés à ce sujet.
Il existe deux modes de consommation : Clustering (consommation du cluster) et Broadcasting (consommation de diffusion).
Contrairement à d'autres MQ, qui spécifient la consommation de cluster ou la consommation de diffusion lors de l'envoi de messages, RocketMQ définit s'il s'agit d'une consommation de cluster ou d'une consommation de diffusion du côté consommateur.
La valeur par défaut est le mode de consommation du cluster Dans ce mode, tous les consommateurs du ConsumerGroup consomment conjointement les messages d'un sujet, et chaque consommateur est responsable de la consommation des messages de N. (N peut également être 1, voire 0, ce qui n'est pas attribué à la file d'attente), mais une file d'attente ne sera consommée que par un seul consommateur. Si un consommateur décède, d'autres consommateurs du ConsumerGroup prendront le relais et continueront à consommer.
En mode de consommation du cluster, la progression de la consommation est maintenue côté Borker, et le chemin de stockage est ${ROCKET_HOME}/store/config/ consumerOffset.json
, comme le montre la figure ci-dessous : Utilisez topicName@consumerGroupName
comme clé, la progression de la consommation comme Value, et la forme de Value est queueId:offset
, indiquant que s'il existe plusieurs ConsumerGroups, la progression de la consommation de chaque ConsumerGroup est différente et doit être stockée séparément.
Les messages de consommation de diffusion seront envoyés à tous les consommateurs du ConsumerGroup.
En mode consommation diffusion, la progression de la consommation est maintenue côté Consommateur.
Nous savons qu'en mode de consommation de cluster, tous les consommateurs du ConsumerGroup consomment conjointement un message de sujet, chaque consommateur est responsable de la consommation des messages de N files d'attente, alors comment est-il alloué ? Cela implique l'algorithme de chargement de la file d'attente de consommation.
RocketMQ fournit de nombreux algorithmes de chargement de file d'attente grand public, parmi lesquels les deux algorithmes les plus couramment utilisés sont AllocateMessageQueueAveragely et AllocateMessageQueueAveragelyByCircle. Jetons un coup d'œil aux différences entre ces deux algorithmes.
Supposons qu'un sujet ait désormais 16 files d'attente, représentées par q0~q15, et 3 consommateurs, représentés par c0-c2.
Les résultats de l'utilisation d'AllocateMessageQueueAveragely pour consommer l'algorithme de chargement de file d'attente sont les suivants :
Le résultat de l'utilisation d'AllocateMessageQueueAveragelyByCircle pour consommer l'algorithme de chargement de file d'attente est le suivant :
Tous les consommateurs sous ConsumerGroup consomment ensemble les messages d'un sujet , et chaque consommateur est responsable de la consommation de N messages de file d'attente, mais une file d'attente ne peut pas être consommée par N consommateurs en même temps. Qu'est-ce que cela signifie ?
Si vous êtes intelligent, vous avez dû penser que si un sujet n'a que 4 files d'attente et 5 consommateurs, alors un consommateur ne sera affecté à aucune file d'attente, donc dans RocketMQ, le nombre de files d'attente sous le sujet est directement Il détermine le nombre maximum de consommateurs, ce qui signifie que vous ne pouvez pas augmenter la vitesse de consommation simplement en ajoutant plus de consommateurs.
Bien qu'il soit recommandé de prendre pleinement en compte le nombre de files d'attente lors de la création d'un sujet, la situation réelle est souvent insatisfaisante. Même si le nombre de files d'attente ne change pas, le nombre de consommateurs le fera. restent les mêmes. Des changements se produiront, par exemple lorsqu'un consommateur se connecte ou se déconnecte, qu'un consommateur raccroche ou qu'un nouveau consommateur est ajouté. L'expansion et la contraction de la file d'attente ainsi que l'expansion et la contraction du Consommateur entraîneront un rééquilibrage, c'est-à-dire que la file d'attente de consommation sera redistribuée au Consommateur.
Dans RocketMQ, le consommateur interrogera périodiquement le nombre de files d'attente de sujets. Si le nombre de consommateurs change, un rééquilibrage sera déclenché.
Le rééquilibrage est implémenté en interne par RocketMQ et les programmeurs n'ont pas besoin de s'en soucier.
De manière générale, MQ dispose de deux méthodes pour obtenir des messages :
Qu'il s'agisse de Pull ou de Push, le Consommateur interagira toujours avec le Courtier. Il existe généralement trois modes d'interaction : la connexion courte, la connexion longue et le sondage.
Il semble que RocketMQ prenne en charge à la fois Pull et Push, mais en fait Push est également implémenté à l'aide de Pull. Alors, comment le consommateur interagit-il avec le courtier ?
C'est l'ingéniosité de la conception de RocketMQ. Ce n'est ni une connexion courte, ni une connexion longue, ni une interrogation, mais une interrogation longue.
Le consommateur lance une demande d'extraction de messages, qui est divisée en deux situations :
RocketMQ prend en charge les messages de transaction. Une fois que le producteur a envoyé le message de transaction au courtier, le courtier stockera le message dans le sujet du système : RMQ_SYS_TRANS_HALF_TOPIC
, de sorte que le consommateur ne peut pas la consommer.
Le courtier aura une tâche planifiée pour consommer les RMQ_SYS_TRANS_HALF_TOPIC
messages et lancer une révision vers le producteur. Il existe trois statuts de révision : soumis, annulé et inconnu.
Un message retardé signifie qu'une fois les informations envoyées au courtier, elles ne peuvent pas être consommées immédiatement par le consommateur. Il doit attendre un certain temps avant. il peut être consommé. RocketMQ ne prend en charge que des délais spécifiques : 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
.
RocketMQ prend en charge deux formes de consommation : la consommation simultanée et la consommation séquentielle. S'il est consommé de manière séquentielle, vous devez vous assurer que les messages triés se trouvent dans la même file d'attente. Comment choisir une file d'attente à envoyer ? RocketMQ dispose de plusieurs surcharges pour l'envoi de messages, dont l'une prend en charge la sélection de file d'attente.
Le producteur envoie le message à Borker, qui doit conserver le message. RocketMQ prend en charge deux stratégies de persistance :
Pour la fiabilité et la disponibilité de MQ, dans l'environnement de production, les nœuds suiveurs sont généralement déployés, et les nœuds suiveurs copieront les données du maître. prend en charge les deux. Maintenir la stratégie de réplication :
Le fait que « l'écriture » soit écrite sur PageCache ou sur le disque dur dépend de la configuration de Follower Broker.
RocketMQ propose trois méthodes d'envoi de messages :
Dans le développement réel, la méthode synchrone est généralement utilisée. Si vous souhaitez améliorer les performances de RocketMQ, vous modifiez généralement les paramètres côté Borker, notamment la stratégie de brushing de disque et la stratégie de réplication.
Lors de l'envoi d'un message, si MessageQueueSelector est utilisé, le mécanisme de nouvelle tentative d'envoi du message sera invalide.
La réponse à l'envoi d'un message peut être les quatre suivantes :
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE, }复制代码
Sauf la première, les autres situations sont problématiques. Afin de garantir que le message ne soit pas perdu, le Producteur. le paramètre doit être défini : RetryAnotherBrokerWhenNotStoreOK
est vrai.
Si le message ne parvient pas à être envoyé et est toujours envoyé au Borker lors d'une nouvelle tentative, l'envoi échouera toujours avec une forte probabilité. La conception ingénieuse de RockteMQ est la suivante. le temps de nouvelle tentative, il évitera automatiquement ce Borker et choisira d'autres Borkers. Cependant, jusqu'à présent, l'envoi asynchrone n'est pas si intelligent et ne réessayera que sur un seul Borker, il est donc fortement recommandé de choisir la méthode d'envoi synchrone.
RocketMQ fournit deux mécanismes d'évitement des pannes. Utilisez le paramètre SendLatencyFaultEnable
pour contrôler.
Le mécanisme d'interruption différée semble être très utile, mais d'une manière générale, l'extrémité du Borker est occupée, ce qui entraîne l'indisponibilité du Borker ou l'indisponibilité du réseau. Cela ne prend qu'un instant et peut l'être. restauré immédiatement. Si l'interruption différée est activée, le Borker qui était initialement disponible a été contourné pendant un certain temps et les autres Borkers étaient plus occupés, ce qui peut aggraver la situation.
Le consommateur a deux paramètres, le degré de parallélisme qui peut être consommé, à savoir ConsumeThreadMin
, ConsumeThreadMax
, ce qui semble pour donner Le sentiment humain est que s'il y a relativement peu de messages accumulés du côté consommateur, le nombre de fils de discussion consommateur est ConsumeThreadMin
; s'il y a plus de messages accumulés du côté consommateur, de nouveaux fils seront automatiquement ouverts pour la consommation jusqu'à ce que le le nombre de threads consommateurs est ConsumeThreadMax
. Mais ce n'est pas le cas. Le consommateur détient un pool de threads en interne et utilise une file d'attente illimitée. Autrement dit, le paramètre ConsumeThreadMax
n'est pas valide, donc dans le développement réel, ConsumeThreadMin
et ConsumeThreadMax
sont souvent définis sur la même valeur. .
Si la progression de la consommation ne peut pas être interrogée, où le consommateur commence-t-il à consommer ? RocketMQ prend en charge la consommation de trois manières : le dernier message, le premier message et l'horodatage spécifié.
RocketMQ configurera une file d'attente de nouvelles tentatives avec un nom de sujet %RETRY%+consumerGroup
pour chaque ConsumerGroup afin d'enregistrer les messages qui doivent être réessayés pour le ConsumerGroup, mais réessayez Un certain délai est requis. RocketMQ traite les messages de nouvelle tentative en les enregistrant d'abord dans la file d'attente de retard avec le nom du sujet SCHEDULE_TOPIC_XXXX
. Les tâches planifiées en arrière-plan sont retardées en fonction de l'heure correspondante, puis réenregistrées dans la file d'attente de nouvelle tentative de %RETRY%+consumerGroup
.
Je pensais que je serais capable d'écrire facilement si j'écrivais un texte d'alphabétisation, mais j'y ai quand même trop réfléchi. Parce que c'est un texte d'alphabétisation, il s'adresse à des amis qui n'ont pas eu beaucoup de contacts avec. RocketMQ, mais RocketMQ est-il si bon ? C'est simple. Il est impossible d'utiliser un blog pour permettre à des amis qui n'ont pas eu beaucoup de contacts avec RocketMQ de démarrer en douceur. Alors, en écrivant le blog, j'ai pensé : est-ce important et doit-il être décrit avec soin ?Cette chose peut être ignorée ou non. Introduction, etc. Vous pouvez voir que cet article introduit essentiellement divers concepts et n'implique presque pas le niveau de l'API, car une fois l'API impliquée, on estime que. il ne sera pas terminé dans deux semaines.
Recommandations d'apprentissage gratuites associées : Tutoriel de base Java
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!