ホームページ  >  記事  >  Java  >  SpringBoot が RabbitMQ を統合して遅延キューを実装する方法

SpringBoot が RabbitMQ を統合して遅延キューを実装する方法

WBOY
WBOY転載
2023-05-16 20:31:10862ブラウズ

    メッセージが失われないようにする方法

    rabbitmq メッセージ配信パス

    Producer->Switch-> Queue->Consumer

    一般に、これは 3 つの段階に分かれています。

    • 1. プロデューサは、メッセージ配信の信頼性を保証します。

    • 2.mq 内部メッセージは失われません。

    • 3. 消費者消費は成功しています。

    メッセージ配信の信頼性とは

    簡単に言うと、メッセージは 100% メッセージ キューに送信されます。

    confirmCallback をオンにすることができます

    プロデューサーがメッセージを配信した後、mq はプロデューサーに確認応答を返します。確認応答に基づいて、プロデューサーはメッセージが mq に送信されたかどうかを確認できます。

    ##confirmCallback を開く

    構成ファイルを変更します

    #NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法
    spring:
      rabbitmq:
        publisher-confirm-type: correlated

    テスト コード

    @Test  
    public void testConfirmCallback() throws InterruptedException {  
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
        /**  
        *  
        * @param correlationData 配置  
        * @param ack 交换机是否收到消息,true是成功,false是失败  
        * @param cause 失败的原因  
        */  
        @Override  
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
            System.out.println("confirm=====>");  
            System.out.println("confirm==== ack="+ack);  
            System.out.println("confirm==== cause="+cause);  
            //根据ACK状态做对应的消息更新操作 TODO  
        }  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");  
        Thread.sleep(10000);  
    }

    returnCallback を使用して、メッセージがエクスチェンジャからキューに正常に送信されたことを確認します。構成ファイルを変更します

    spring:
      rabbitmq:
        #开启returnCallback
        publisher-returns: true
        #交换机处理消息到路由失败,则会返回给生产者
        template:
          mandatory: true

    テスト コード

    @Test  
    void testReturnCallback() {  
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
            int code = returned.getReplyCode();  
            System.out.println("code="+code);  
            System.out.println("returned="+ returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");  
    }

    コンシューマがメッセージを消費するとき、ack を通じてメッセージが消費されたことを手動で確認する必要があります。

    構成ファイルを変更する

    spring:
      rabbitmq:
        listener:  
          simple:  
            acknowledge-mode: manual

    テスト コードを作成する

    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("body="+body);  
    
        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除  
        channel.basicAck(msgTag,false);  
        // channel.basicNack(msgTag,false,true);  
      
    }

    deliveryTags はメッセージ配信シーケンス番号です。メッセージが消費されるか、メッセージが再配信されるたびに、 deliveryTag が増加します

    ttlデッドレターキュー

    ##デッドレターキューとは

    ##期限内に消費されないメッセージが保存されるキュー

    ##何メッセージがデッドレターになる状況

    消費者がメッセージを拒否

    (basic.reject/basic.nack)
      し、再度キューに入れない
    • requeue= false

      #メッセージはキュー内で消費されておらず、キューまたはメッセージ自体の有効期限を超えていますTTL (存続時間)

    • キューのメッセージの長さが制限に達しました

    • 結果: メッセージがデッドレターになった後、キューがバインドされている場合デッド レター スイッチに送信すると、メッセージはデッド レター スイッチによってデッド レター キューに再ルーティングされます。

    • デッド レター キューは、遅延キューの消費によく使用されます。

      遅延キュー
    プロデューサーは、このメッセージが mq に配信されたときにすぐに消費されることを期待しませんが、消費する前に一定期間待機します。

    springboot は、rabbitmq を統合して、タイムアウト時の注文の自動シャットダウンを実現します

    package com.fandf.test.rabbit;  
      
    import org.springframework.amqp.core.*;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.context.annotation.Bean;  
    import org.springframework.context.annotation.Configuration;  
      
    import java.util.HashMap;  
    import java.util.Map;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:38  
    */  
    @Configuration  
    public class RabbitMQConfig {  
      
        /**  
        * 订单交换机  
        */  
        public static final String ORDER_EXCHANGE = "order_exchange";  
        /**  
        * 订单队列  
        */  
        public static final String ORDER_QUEUE = "order_queue";  
        /**  
        * 订单路由key  
        */  
        public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  
    
        /**  
        * 死信交换机  
        */  
        public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
        /**  
        * 死信队列 routingKey  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  
    
        /**  
        * 死信队列  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  
    
    
        /**  
        * 创建死信交换机  
        */  
        @Bean("orderDeadLetterExchange")  
        public Exchange orderDeadLetterExchange() {  
            return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建死信队列  
        */  
        @Bean("orderDeadLetterQueue")  
        public Queue orderDeadLetterQueue() {  
            return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
        }  
    
        /**  
        * 绑定死信交换机和死信队列  
        */  
        @Bean("orderDeadLetterBinding")  
        public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
        }  
    
    
        /**  
        * 创建订单交换机  
        */  
        @Bean("orderExchange")  
        public Exchange orderExchange() {  
            return new TopicExchange(ORDER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建订单队列  
        */  
        @Bean("orderQueue")  
        public Queue orderQueue() {  
            Map<String, Object> args = new HashMap<>(3);  
            //消息过期后,进入到死信交换机  
            args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  
    
            //消息过期后,进入到死信交换机的路由key  
            args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  
    
            //过期时间,单位毫秒  
            args.put("x-message-ttl", 10000);  
    
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
        }  
    
        /**  
        * 绑定订单交换机和队列  
        */  
        @Bean("orderBinding")  
        public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
        }  
      
      
    }

    Consumer

    package com.fandf.test.rabbit;  
      
    import cn.hutool.core.date.DateUtil;  
    import com.rabbitmq.client.Channel;  
    import org.springframework.amqp.core.Message;  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    import org.springframework.stereotype.Component;  
      
    import java.io.IOException;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:42  
    */  
    @Component  
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
    public class OrderMQListener {  
      
      
      
        @RabbitHandler  
        public void consumer(String body, Message message, Channel channel) throws IOException {  
            System.out.println("收到消息:" + DateUtil.now());  
            long msgTag = message.getMessageProperties().getDeliveryTag();  
            System.out.println("msgTag=" + msgTag);  
            System.out.println("message=" + message);  
            System.out.println("body=" + body);  
            channel.basicAck(msgTag, false);  
        }  
      
    }

    テストクラス

    @Test  
    void testOrder() throws InterruptedException {  
    //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code=" + code);  
        System.out.println("returned=" + returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");  
        System.out.println("发送消息:" + DateUtil.now());  
        Thread.sleep(20000);  
    }

    プログラム出力

    メッセージ送信: 2023-04-16 15:14:34

    メッセージ受信: 2023-04-16 15:14:44

    msgTag=1

    message=(本文: 'テスト注文の遅延' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1,exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023、routing-keys=[order]、queue=order_queue}]、x-first-death-reason=expired、x-first-death-queue=order_queue}、contentType=text/plain、contentEncoding = UTF-8、contentLength=0、receiveddeliveryMode=PERSISTENT、priority=0、redelivered=false、receivedExchange=order_dead_letter_exchange、receivedRoutingKey=order_dead_letter_queue_routing_key、deliveryTag=1、consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ、consumerQueue=order_dead_letter_queue] )
    体= テスト注文の遅延


    以上がSpringBoot が RabbitMQ を統合して遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。