Maison >Java >javaDidacticiel >Analyse des problèmes courants et solutions de la file d'attente de messages Java RabbitMQ
Le scénario de génération d'accumulation de messages :
La vitesse des messages générés par les producteurs est supérieure à la vitesse de consommation par les consommateurs. Solution : Augmenter le nombre ou la vitesse des consommateurs.
Quand il n'y a pas de consommateurs à consommer. Solution : file d'attente des lettres mortes, définition de la période de validité des messages. Cela équivaut à définir une période de validité pour nos messages. S'il n'y a pas de consommation dans le délai spécifié, elle expirera automatiquement. À son expiration, la méthode de surveillance du rappel client sera exécutée pour stocker le message dans l'enregistrement de la table de la base de données. l'indemnisation sera réalisée ultérieurement.
1 Le producteur utilise le mécanisme de confirmation de message pour garantir que le message peut être transmis à MQ à 100 % avec succès.
2. Le serveur MQ doit conserver le message sur le disque dur
3. Le consommateur utilise le mécanisme d'acquittement manuel pour confirmer que la consommation du message est réussie
Que dois-je faire si la capacité du serveur MQ est pleine ?
Utilisez la file d'attente des lettres mortes pour stocker les messages dans la base de données et compenser la consommation ultérieure.
La file d'attente des lettres mortes RabbitMQ est communément appelée file d'attente de la roue de secours ; une fois que le middleware de messages a rejeté le message pour une raison quelconque, il peut également être transféré dans la file d'attente des lettres mortes pour le stockage. commutateurs et clés de routage, etc.
Contexte :
Le message délivré à MQ a expiré
La file d'attente a atteint la longueur maximale (le conteneur de file d'attente est plein) Le producteur a refusé de recevoir le message
Le consommateur n'a pas réussi à consommer plusieurs les messages, seront transférés et stockés dans la file d'attente des lettres mortes
Cas de code :
dépendance maven
<dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
configuration yml
server: # 服务启动端口配置 port: 8081 servlet: # 应用访问路径 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####连接地址 host: www.kaicostudy.com ####端口号 port: 5672 ####账号 username: kaico ####密码 password: kaico ### 地址 virtual-host: /kaicoStudy ###模拟演示死信队列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###备胎交换机 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
classe de configuration de la file d'attente
@Configuration public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 绑定死信队列到死信交换机 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 声明订单队列,并且绑定死信队列 * * @return Queue */ @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
consommateur de file d'attente de lettres incorrectes
@Component public class OrderDlxConsumer { /** * 死信队列监听队列回调的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信队列消费订单消息" + msg); } }
consommateur de file d'attente ordinaire
@Component public class OrderConsumer { /** * 监听队列回调的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常订单消费者消息msg:" + msg); } }
La page de gestion de la file d'attente en arrière-plan est la suivante :
Méthode de déploiement : La file d'attente de lettres mortes ne peut pas exister sur le même serveur que la file d'attente normale et doit être stockée sur des serveurs distincts.
Plan de mise en œuvre pour que le système expire et s'arrête automatiquement si la commande n'est pas payée pendant 30 minutes.
Mise en œuvre basée sur la planification des tâches, l'efficacité est très faible.
Implémenté en fonction de la clé redis expirée. Lorsque la clé expire, une méthode sera rappelée au client.
Lorsque l'utilisateur passe une commande, un token (durée de validité) est généré pendant 30 minutes et stocké dans notre redis Inconvénients : Très redondant, un champ redondant sera stocké dans la table ;
Situation RabbitMQ en file d'attente basée sur MQ (meilleure solution).
Principe : Lorsque nous passons une commande, nous envoyons un message à mq et fixons la période de validité à 30 minutes. Mais lorsque le message expire (sans être consommé), nous exécutons une méthode sur notre client pour nous indiquer que le message a été utilisé. expiré. Vérifiez à ce moment si la commande a été payée.
Logique d'implémentation :
Principalement implémenté à l'aide de files d'attente de lettres mortes.
Le code que vous voulez : les consommateurs normaux ne consomment pas de messages, ou il n'y a pas de consommateurs normaux. Ils entrent dans la file d'attente des lettres mortes après le temps défini, puis les consommateurs de lettres mortes implémentent la logique métier correspondante.
Lorsqu'une exception est levée dans le code logique métier du consommateur, la nouvelle tentative est automatiquement implémentée (la valeur par défaut est d'innombrables tentatives)
Il devrait y avoir une limite sur le nombre de RabbitMQ tentatives, par exemple, un maximum de 5 tentatives, avec un intervalle de 3 secondes à chaque fois ; si la nouvelle tentative échoue plusieurs fois, elle sera stockée dans la file d'attente des lettres mortes ou stockée dans une table de base de données pour enregistrer une compensation manuelle ultérieure. Car après le nombre de tentatives infructueuses, la file d'attente supprimera automatiquement le message.
Principe de nouvelle tentative de message : pendant le processus de nouvelle tentative, utilisez aop pour intercepter notre méthode de surveillance de la consommation, et ce journal d'erreurs ne sera pas imprimé. En cas d'échec après plusieurs tentatives, le journal des erreurs ne sera imprimé que lorsque le nombre maximum d'échecs est atteint.
Si la consommation échoue plusieurs fois :
1. Supprimez automatiquement le message (le message peut être perdu)
Solution :
Si l'enrichissement échoue plusieurs fois, il sera éventuellement stocké dans la file d'attente des lettres mortes ; journalisation de table pour enregistrer les journaux d’erreurs d’échec de consommation, puis compenser manuellement les messages.
Choix raisonnable du mécanisme de nouvelle tentative
Réponse : Parfois, l'appel échoue en raison d'une exception réseau et il peut être nécessaire de le réessayer plusieurs fois.
Une fois que le consommateur a obtenu le message, une exception de données est levée en raison de problèmes de code. Doit-il être réessayé ?
Réponse : Il n'est pas nécessaire de réessayer. Si le code est anormal, le projet de version de code doit être à nouveau modifié.
Le consommateur active le mode de réception manuel
La deuxième étape, le code Java du consommateur
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
Comment RabbitMQ résout-il le problème du pouvoir du message ?
Raison : le consommateur peut activer la nouvelle tentative automatique, et le processus de nouvelle tentative peut entraîner l'exécution répétée du code logique métier du consommateur. À ce moment-là, le message a été consommé. Étant donné que l'erreur métier a provoqué la consommation du message, la solution apparaîtra à ce moment-là : utilisez l'identifiant global du message pour déterminer en fonction de l'identifiant métier (global unique). id), le consommateur peut juger que le message a été consommé.
Logique du code consommateur :
RabbitMQ résout les problèmes de transactions distribuées
Résoudre l'idée centrale des transactions distribuées : la cohérence finale des données.
Nom dans le domaine distribué :
Cohérence forte : Soit la vitesse de synchronisation est très rapide, soit le mécanisme de verrouillage n'autorise pas les lectures sales ;
Solution de cohérence forte : Soit la base de données A synchronise les données avec les données B très rapidement, soit la base de données B ; ne peut pas lire les données avant que la synchronisation de la base de données A ne soit terminée.
Faible cohérence : les données dont la lecture est autorisée sont les données originales sales, et les résultats lus peuvent être incohérents.
Cohérence éventuelle : dans notre système distribué, étant donné que les données sont communiquées de manière synchrone via le réseau, de courts délais de données sont autorisés, mais les données finales doivent être cohérentes.
L'idée de résoudre les transactions distribuées basées sur RabbitMQ
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!