chemin de livraison des messages Rabbitmq
Producteur->Commutateur->File d'attente->Consommateur
En général, il est divisé en trois étapes.
1. Le producteur assure la fiabilité de la livraison des messages.
2.mq les messages internes ne sont pas perdus.
3. La consommation des consommateurs est réussie.
Pour faire simple, le message est envoyé à 100% dans la file d'attente des messages.
Nous pouvons activer confirmCallback
Une fois que le producteur a transmis le message, mq donnera au producteur un accusé de réception. En fonction de l'accusé de réception, le producteur peut confirmer si le message est envoyé à mq.
Activer confirmCallback
Modifier le fichier de configuration.
#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlated
Le code de test
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }
garantit que le message est envoyé avec succès de l'échange à la file d'attente via returnCallback. Modifier le fichier de configuration
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
Code de test
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }
Lorsque les consommateurs consomment des messages, ils doivent confirmer manuellement que les messages ont été consommés via ack.
Modifiez le fichier de configuration
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Écrivez le code de test
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }
deliveryTags est le numéro de séquence de livraison du message Chaque fois qu'un message est consommé ou que le message est relivré, le deliveryTag sera augmenté
n'a pas été La file d'attente dans laquelle les messages consommés en temps opportun sont stockés
Le consommateur rejette le message (basic.reject/ basic. nack) et ne remet pas en file d'attente requeue=false
Le message n'a pas été consommé dans la file d'attente et a dépassé le délai d'expiration de la file d'attente ou du message lui-mêmeTTL (time-to-live)
La longueur du message de la file d'attente a atteint la limite
Le résultat : le message devient une lettre morte Enfin, si la file d'attente est liée à un commutateur de lettre morte, le message sera redirigé vers la file d'attente des lettres mortes par le commutateur de lettres mortes
La file d'attente de lettres mortes est souvent utilisée pour la consommation de file d'attente retardée.
Le producteur ne s'attend pas à ce que ce message soit consommé immédiatement lorsqu'il est livré à mq, mais attend un certain temps avant de le consommer.
package com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 订单队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 订单路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交换机 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信队列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信队列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 创建死信交换机 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 创建死信队列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 绑定死信交换机和死信队列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 创建订单交换机 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 创建订单队列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 绑定订单交换机和队列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
Consumer
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }
Classe de test
@Test void testOrder() throws InterruptedException { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟"); System.out.println("发送消息:" + DateUtil.now()); Thread.sleep(20000); }
Sortie du programme
Envoyer un message : 2023-04-16 15:14:34
Recevoir le message 2023-0 4 - 16 15:14:44
msgTag=1
message=(Corps : 'Délai de commande de test' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x - death=[{reason=expired, count=1, change=order_exchange, time=Mon 16 avril 15:14:44 CST 2023, router-keys=[order], queue=order_queue}], x-first-death- raison = expiré, x-first-death-queue = order_queue}, contentType = texte/plain, contentEncoding = UTF-8, contentLength = 0, reçuDeliveryMode = PERSISTENT, priorité = 0, relivré = faux, reçuExchange = order_dead_letter_exchange, reçu RoutingKey = order_dead_letter_queue_routing_key , deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=Test du délai de commande
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!