How SpringBoot integrates RabbitMQ to implement delay queue
How to ensure that messages are not lost
rabbitmq message delivery path
Producer->Switch->Queue- >Consumer
In general, it is divided into three stages.
1. The producer ensures the reliability of message delivery.
2.mq internal messages are not lost.
3. Consumer consumption is successful.
What is message delivery reliability
To put it simply, the message is 100% sent to the message queue.
We can turn on confirmCallback
After the producer delivers the message, mq will give the producer an ack. Based on the ack, the producer can confirm whether the message is sent to mq.
Open confirmCallback
Modify the configuration file
#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlated
Test code
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }
Use returnCallback to ensure that the message is successfully sent from the exchanger to the queue. Modify the configuration file
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
Test code
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }
When consumers consume messages, they need to manually confirm that the messages have been consumed through ack.
Modify the configuration file
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Write test code
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }
deliveryTags is the message delivery sequence number. Every time a message is consumed or the message is re-delivered, the deliveryTag will be increased
ttlDead letter queue
What is a dead letter queue
A queue where messages that are not consumed in time are stored
What are the circumstances under which a message becomes a dead letter
Consumer rejects message(basic.reject/basic.nack) and does not requeue requeue=false
The message has not been consumed in the queue and has exceeded the expiration time of the queue or the message itselfTTL (time-to-live)
of the queue The message length reaches the limit
-
Result: After the message becomes a dead letter, if the queue is bound to a dead letter switch, the message will be rerouted to the dead letter queue by the dead letter switch
Dead letter queues are often used for delayed queue consumption.
Delay queue
The producer does not expect this message to be consumed immediately when it is delivered to mq, but waits for a period of time before consuming it.
springboot integrates rabbitmq to realize automatic shutdown of orders when timeout
package com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 订单队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 订单路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交换机 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信队列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信队列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 创建死信交换机 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 创建死信队列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 绑定死信交换机和死信队列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 创建订单交换机 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 创建订单队列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 绑定订单交换机和队列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
Consumer
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }
Test class
@Test void testOrder() throws InterruptedException { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟"); System.out.println("发送消息:" + DateUtil.now()); Thread.sleep(20000); }
Program output
Sent message: 2023-04-16 15:14:34
Received message: 2023-04-16 15:14:44
msgTag=1
message=(Body:'Test order delay' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding= UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue] )
body= Test order delay
The above is the detailed content of How SpringBoot integrates RabbitMQ to implement delay queue. For more information, please follow other related articles on the PHP Chinese website!

Start Spring using IntelliJIDEAUltimate version...

When using MyBatis-Plus or other ORM frameworks for database operations, it is often necessary to construct query conditions based on the attribute name of the entity class. If you manually every time...

Java...

How does the Redis caching solution realize the requirements of product ranking list? During the development process, we often need to deal with the requirements of rankings, such as displaying a...

Conversion of Java Objects and Arrays: In-depth discussion of the risks and correct methods of cast type conversion Many Java beginners will encounter the conversion of an object into an array...

Solutions to convert names to numbers to implement sorting In many application scenarios, users may need to sort in groups, especially in one...

Detailed explanation of the design of SKU and SPU tables on e-commerce platforms This article will discuss the database design issues of SKU and SPU in e-commerce platforms, especially how to deal with user-defined sales...

How to set the SpringBoot project default run configuration list in Idea using IntelliJ...


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

SublimeText3 English version
Recommended: Win version, supports code prompts!

VSCode Windows 64-bit Download
A free and powerful IDE editor launched by Microsoft

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.

SublimeText3 Linux new version
SublimeText3 Linux latest version

Dreamweaver CS6
Visual web development tools