Maison >Java >javaDidacticiel >Comment SpringBoot intègre RabbitMQ pour implémenter la file d'attente différée

Comment SpringBoot intègre RabbitMQ pour implémenter la file d'attente différée

WBOY
WBOYavant
2023-05-16 20:31:10925parcourir

    Comment s'assurer que les messages ne sont pas perdus

    chemin de livraison des messages Rabbitmq

    Producteur->Commutateur->File d'attente->Consommateur

    En général, il est divisé en trois étapes.

    • 1. Le producteur assure la fiabilité de la livraison des messages.

    • 2.mq les messages internes ne sont pas perdus.

    • 3. La consommation des consommateurs est réussie.

    Qu'est-ce que la fiabilité de la livraison des messages

    Pour faire simple, le message est envoyé à 100% dans la file d'attente des messages.

    Nous pouvons activer confirmCallback

    Une fois que le producteur a transmis le message, mq donnera au producteur un accusé de réception. En fonction de l'accusé de réception, le producteur peut confirmer si le message est envoyé à mq.

    Activer confirmCallback

    Modifier le fichier de configuration.

    #NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法
    spring:
      rabbitmq:
        publisher-confirm-type: correlated

    Le code de test

    @Test  
    public void testConfirmCallback() throws InterruptedException {  
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
        /**  
        *  
        * @param correlationData 配置  
        * @param ack 交换机是否收到消息,true是成功,false是失败  
        * @param cause 失败的原因  
        */  
        @Override  
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
            System.out.println("confirm=====>");  
            System.out.println("confirm==== ack="+ack);  
            System.out.println("confirm==== cause="+cause);  
            //根据ACK状态做对应的消息更新操作 TODO  
        }  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");  
        Thread.sleep(10000);  
    }

    garantit que le message est envoyé avec succès de l'échange à la file d'attente via returnCallback. Modifier le fichier de configuration

    spring:
      rabbitmq:
        #开启returnCallback
        publisher-returns: true
        #交换机处理消息到路由失败,则会返回给生产者
        template:
          mandatory: true

    Code de test

    @Test  
    void testReturnCallback() {  
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
            int code = returned.getReplyCode();  
            System.out.println("code="+code);  
            System.out.println("returned="+ returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");  
    }

    Lorsque les consommateurs consomment des messages, ils doivent confirmer manuellement que les messages ont été consommés via ack.

    Modifiez le fichier de configuration

    spring:
      rabbitmq:
        listener:  
          simple:  
            acknowledge-mode: manual

    Écrivez le code de test

    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("body="+body);  
    
        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除  
        channel.basicAck(msgTag,false);  
        // channel.basicNack(msgTag,false,true);  
      
    }

    deliveryTags est le numéro de séquence de livraison du message Chaque fois qu'un message est consommé ou que le message est relivré, le deliveryTag sera augmenté

    ttl file d'attente des lettres mortes

    .

    Qu'est-ce qu'une file d'attente de lettres mortes ?

    n'a pas été La file d'attente dans laquelle les messages consommés en temps opportun sont stockés

    Dans quelles situations un message devient-il une lettre morte

    • Le consommateur rejette le message (basic.reject/ basic. nack) et ne remet pas en file d'attente requeue=false

    • Le message n'a pas été consommé dans la file d'attente et a dépassé le délai d'expiration de la file d'attente ou du message lui-mêmeTTL (time-to-live)

    • La longueur du message de la file d'attente a atteint la limite

    • Le résultat : le message devient une lettre morte Enfin, si la file d'attente est liée à un commutateur de lettre morte, le message sera redirigé vers la file d'attente des lettres mortes par le commutateur de lettres mortes

    La file d'attente de lettres mortes est souvent utilisée pour la consommation de file d'attente retardée.

    File d'attente différée

    Le producteur ne s'attend pas à ce que ce message soit consommé immédiatement lorsqu'il est livré à mq, mais attend un certain temps avant de le consommer.

    springboot intègre lapinmq pour réaliser la fermeture automatique des commandes en cas d'expiration du délai

    package com.fandf.test.rabbit;  
      
    import org.springframework.amqp.core.*;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.context.annotation.Bean;  
    import org.springframework.context.annotation.Configuration;  
      
    import java.util.HashMap;  
    import java.util.Map;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:38  
    */  
    @Configuration  
    public class RabbitMQConfig {  
      
        /**  
        * 订单交换机  
        */  
        public static final String ORDER_EXCHANGE = "order_exchange";  
        /**  
        * 订单队列  
        */  
        public static final String ORDER_QUEUE = "order_queue";  
        /**  
        * 订单路由key  
        */  
        public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  
    
        /**  
        * 死信交换机  
        */  
        public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
        /**  
        * 死信队列 routingKey  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  
    
        /**  
        * 死信队列  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  
    
    
        /**  
        * 创建死信交换机  
        */  
        @Bean("orderDeadLetterExchange")  
        public Exchange orderDeadLetterExchange() {  
            return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建死信队列  
        */  
        @Bean("orderDeadLetterQueue")  
        public Queue orderDeadLetterQueue() {  
            return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
        }  
    
        /**  
        * 绑定死信交换机和死信队列  
        */  
        @Bean("orderDeadLetterBinding")  
        public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
        }  
    
    
        /**  
        * 创建订单交换机  
        */  
        @Bean("orderExchange")  
        public Exchange orderExchange() {  
            return new TopicExchange(ORDER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建订单队列  
        */  
        @Bean("orderQueue")  
        public Queue orderQueue() {  
            Map<String, Object> args = new HashMap<>(3);  
            //消息过期后,进入到死信交换机  
            args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  
    
            //消息过期后,进入到死信交换机的路由key  
            args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  
    
            //过期时间,单位毫秒  
            args.put("x-message-ttl", 10000);  
    
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
        }  
    
        /**  
        * 绑定订单交换机和队列  
        */  
        @Bean("orderBinding")  
        public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
        }  
      
      
    }

    Consumer

    package com.fandf.test.rabbit;  
      
    import cn.hutool.core.date.DateUtil;  
    import com.rabbitmq.client.Channel;  
    import org.springframework.amqp.core.Message;  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    import org.springframework.stereotype.Component;  
      
    import java.io.IOException;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:42  
    */  
    @Component  
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
    public class OrderMQListener {  
      
      
      
        @RabbitHandler  
        public void consumer(String body, Message message, Channel channel) throws IOException {  
            System.out.println("收到消息:" + DateUtil.now());  
            long msgTag = message.getMessageProperties().getDeliveryTag();  
            System.out.println("msgTag=" + msgTag);  
            System.out.println("message=" + message);  
            System.out.println("body=" + body);  
            channel.basicAck(msgTag, false);  
        }  
      
    }

    Classe de test

    @Test  
    void testOrder() throws InterruptedException {  
    //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code=" + code);  
        System.out.println("returned=" + returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");  
        System.out.println("发送消息:" + DateUtil.now());  
        Thread.sleep(20000);  
    }

    Sortie du programme

    Envoyer un message : 2023-04-16 15:14:34
    Recevoir le message 2023-0 4 - 16 15:14:44
    msgTag=1
    message=(Corps : 'Délai de commande de test' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x - death=[{reason=expired, count=1, change=order_exchange, time=Mon 16 avril 15:14:44 CST 2023, router-keys=[order], queue=order_queue}], x-first-death- raison = expiré, x-first-death-queue = order_queue}, contentType = texte/plain, contentEncoding = UTF-8, contentLength = 0, reçuDeliveryMode = PERSISTENT, priorité = 0, relivré = faux, reçuExchange = order_dead_letter_exchange, reçu RoutingKey = order_dead_letter_queue_routing_key , deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
    body=Test du délai de commande

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer