search
HomeJavajavaTutorialHow SpringBoot integrates RabbitMQ to implement delay queue

    How to ensure that messages are not lost

    rabbitmq message delivery path

    Producer->Switch->Queue- >Consumer

    In general, it is divided into three stages.

    • 1. The producer ensures the reliability of message delivery.

    • 2.mq internal messages are not lost.

    • 3. Consumer consumption is successful.

    What is message delivery reliability

    To put it simply, the message is 100% sent to the message queue.

    We can turn on confirmCallback

    After the producer delivers the message, mq will give the producer an ack. Based on the ack, the producer can confirm whether the message is sent to mq.

    Open confirmCallback

    Modify the configuration file

    #NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法
    spring:
      rabbitmq:
        publisher-confirm-type: correlated

    Test code

    @Test  
    public void testConfirmCallback() throws InterruptedException {  
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
        /**  
        *  
        * @param correlationData 配置  
        * @param ack 交换机是否收到消息,true是成功,false是失败  
        * @param cause 失败的原因  
        */  
        @Override  
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
            System.out.println("confirm=====>");  
            System.out.println("confirm==== ack="+ack);  
            System.out.println("confirm==== cause="+cause);  
            //根据ACK状态做对应的消息更新操作 TODO  
        }  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");  
        Thread.sleep(10000);  
    }

    Use returnCallback to ensure that the message is successfully sent from the exchanger to the queue. Modify the configuration file

    spring:
      rabbitmq:
        #开启returnCallback
        publisher-returns: true
        #交换机处理消息到路由失败,则会返回给生产者
        template:
          mandatory: true

    Test code

    @Test  
    void testReturnCallback() {  
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
            int code = returned.getReplyCode();  
            System.out.println("code="+code);  
            System.out.println("returned="+ returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");  
    }

    When consumers consume messages, they need to manually confirm that the messages have been consumed through ack.

    Modify the configuration file

    spring:
      rabbitmq:
        listener:  
          simple:  
            acknowledge-mode: manual

    Write test code

    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("body="+body);  
    
        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除  
        channel.basicAck(msgTag,false);  
        // channel.basicNack(msgTag,false,true);  
      
    }

    deliveryTags is the message delivery sequence number. Every time a message is consumed or the message is re-delivered, the deliveryTag will be increased

    ttlDead letter queue

    What is a dead letter queue

    A queue where messages that are not consumed in time are stored

    What are the circumstances under which a message becomes a dead letter

    • Consumer rejects message(basic.reject/basic.nack) and does not requeue requeue=false

    • The message has not been consumed in the queue and has exceeded the expiration time of the queue or the message itselfTTL (time-to-live)

    • of the queue The message length reaches the limit

    • Result: After the message becomes a dead letter, if the queue is bound to a dead letter switch, the message will be rerouted to the dead letter queue by the dead letter switch

    Dead letter queues are often used for delayed queue consumption.

    Delay queue

    The producer does not expect this message to be consumed immediately when it is delivered to mq, but waits for a period of time before consuming it.

    springboot integrates rabbitmq to realize automatic shutdown of orders when timeout

    package com.fandf.test.rabbit;  
      
    import org.springframework.amqp.core.*;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.context.annotation.Bean;  
    import org.springframework.context.annotation.Configuration;  
      
    import java.util.HashMap;  
    import java.util.Map;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:38  
    */  
    @Configuration  
    public class RabbitMQConfig {  
      
        /**  
        * 订单交换机  
        */  
        public static final String ORDER_EXCHANGE = "order_exchange";  
        /**  
        * 订单队列  
        */  
        public static final String ORDER_QUEUE = "order_queue";  
        /**  
        * 订单路由key  
        */  
        public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  
    
        /**  
        * 死信交换机  
        */  
        public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
        /**  
        * 死信队列 routingKey  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  
    
        /**  
        * 死信队列  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  
    
    
        /**  
        * 创建死信交换机  
        */  
        @Bean("orderDeadLetterExchange")  
        public Exchange orderDeadLetterExchange() {  
            return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建死信队列  
        */  
        @Bean("orderDeadLetterQueue")  
        public Queue orderDeadLetterQueue() {  
            return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
        }  
    
        /**  
        * 绑定死信交换机和死信队列  
        */  
        @Bean("orderDeadLetterBinding")  
        public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
        }  
    
    
        /**  
        * 创建订单交换机  
        */  
        @Bean("orderExchange")  
        public Exchange orderExchange() {  
            return new TopicExchange(ORDER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建订单队列  
        */  
        @Bean("orderQueue")  
        public Queue orderQueue() {  
            Map<String, Object> args = new HashMap<>(3);  
            //消息过期后,进入到死信交换机  
            args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  
    
            //消息过期后,进入到死信交换机的路由key  
            args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  
    
            //过期时间,单位毫秒  
            args.put("x-message-ttl", 10000);  
    
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
        }  
    
        /**  
        * 绑定订单交换机和队列  
        */  
        @Bean("orderBinding")  
        public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
        }  
      
      
    }

    Consumer

    package com.fandf.test.rabbit;  
      
    import cn.hutool.core.date.DateUtil;  
    import com.rabbitmq.client.Channel;  
    import org.springframework.amqp.core.Message;  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    import org.springframework.stereotype.Component;  
      
    import java.io.IOException;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:42  
    */  
    @Component  
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
    public class OrderMQListener {  
      
      
      
        @RabbitHandler  
        public void consumer(String body, Message message, Channel channel) throws IOException {  
            System.out.println("收到消息:" + DateUtil.now());  
            long msgTag = message.getMessageProperties().getDeliveryTag();  
            System.out.println("msgTag=" + msgTag);  
            System.out.println("message=" + message);  
            System.out.println("body=" + body);  
            channel.basicAck(msgTag, false);  
        }  
      
    }

    Test class

    @Test  
    void testOrder() throws InterruptedException {  
    //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code=" + code);  
        System.out.println("returned=" + returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");  
        System.out.println("发送消息:" + DateUtil.now());  
        Thread.sleep(20000);  
    }

    Program output

    Sent message: 2023-04-16 15:14:34
    Received message: 2023-04-16 15:14:44
    msgTag=1
    message=(Body:'Test order delay' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding= UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue] )
    body= Test order delay

    The above is the detailed content of How SpringBoot integrates RabbitMQ to implement delay queue. For more information, please follow other related articles on the PHP Chinese website!

    Statement
    This article is reproduced at:亿速云. If there is any infringement, please contact admin@php.cn delete
    How does IntelliJ IDEA identify the port number of a Spring Boot project without outputting a log?How does IntelliJ IDEA identify the port number of a Spring Boot project without outputting a log?Apr 19, 2025 pm 11:45 PM

    Start Spring using IntelliJIDEAUltimate version...

    How to elegantly obtain entity class variable names to build database query conditions?How to elegantly obtain entity class variable names to build database query conditions?Apr 19, 2025 pm 11:42 PM

    When using MyBatis-Plus or other ORM frameworks for database operations, it is often necessary to construct query conditions based on the attribute name of the entity class. If you manually every time...

    How to use the Redis cache solution to efficiently realize the requirements of product ranking list?How to use the Redis cache solution to efficiently realize the requirements of product ranking list?Apr 19, 2025 pm 11:36 PM

    How does the Redis caching solution realize the requirements of product ranking list? During the development process, we often need to deal with the requirements of rankings, such as displaying a...

    How to safely convert Java objects to arrays?How to safely convert Java objects to arrays?Apr 19, 2025 pm 11:33 PM

    Conversion of Java Objects and Arrays: In-depth discussion of the risks and correct methods of cast type conversion Many Java beginners will encounter the conversion of an object into an array...

    How do I convert names to numbers to implement sorting and maintain consistency in groups?How do I convert names to numbers to implement sorting and maintain consistency in groups?Apr 19, 2025 pm 11:30 PM

    Solutions to convert names to numbers to implement sorting In many application scenarios, users may need to sort in groups, especially in one...

    E-commerce platform SKU and SPU database design: How to take into account both user-defined attributes and attributeless products?E-commerce platform SKU and SPU database design: How to take into account both user-defined attributes and attributeless products?Apr 19, 2025 pm 11:27 PM

    Detailed explanation of the design of SKU and SPU tables on e-commerce platforms This article will discuss the database design issues of SKU and SPU in e-commerce platforms, especially how to deal with user-defined sales...

    How to set the default run configuration list of SpringBoot projects in Idea for team members to share?How to set the default run configuration list of SpringBoot projects in Idea for team members to share?Apr 19, 2025 pm 11:24 PM

    How to set the SpringBoot project default run configuration list in Idea using IntelliJ...

    See all articles

    Hot AI Tools

    Undresser.AI Undress

    Undresser.AI Undress

    AI-powered app for creating realistic nude photos

    AI Clothes Remover

    AI Clothes Remover

    Online AI tool for removing clothes from photos.

    Undress AI Tool

    Undress AI Tool

    Undress images for free

    Clothoff.io

    Clothoff.io

    AI clothes remover

    Video Face Swap

    Video Face Swap

    Swap faces in any video effortlessly with our completely free AI face swap tool!

    Hot Tools

    SublimeText3 English version

    SublimeText3 English version

    Recommended: Win version, supports code prompts!

    VSCode Windows 64-bit Download

    VSCode Windows 64-bit Download

    A free and powerful IDE editor launched by Microsoft

    SAP NetWeaver Server Adapter for Eclipse

    SAP NetWeaver Server Adapter for Eclipse

    Integrate Eclipse with SAP NetWeaver application server.

    SublimeText3 Linux new version

    SublimeText3 Linux new version

    SublimeText3 Linux latest version

    Dreamweaver CS6

    Dreamweaver CS6

    Visual web development tools