首頁 >Java >java教程 >怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸

怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸

王林
王林轉載
2023-05-29 22:34:491904瀏覽

    環境設定

    SpringBoot 整合 RabbitMQ 實作訊息的傳送。

    1.新增 maven 依賴

           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.新增application.yml 設定檔

    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx

    3.設定交換器、佇列以及綁定

        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }

    4.生產發送訊息

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【发送消息】" + message)
            return "【send message】" + message;
        }

    5.消費者接收訊息

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);

    6.呼叫生產端發送訊息 hello,控制台輸出:

    【傳送訊息】hello
    【接收訊息】hello 目前時間2022-05-12 10:21:14

    說明訊息已經成功接收。

    訊息遺失分析

    怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸

    一則訊息的從生產到消費,訊息遺失可能發生在以下階段:

    • #生產端遺失: 生產者無法傳送到 RabbitMQ

    • 儲存端遺失: RabbitMQ 儲存自身掛了

    • 消費端遺失:儲存因網路問題,無法傳送到消費端,或是消費掛了,無法傳送正常消費

    RabbitMQ 從生產端、儲存端、消費端都對可靠度傳輸做很好的支援。

    生產階段

    生產階段透過請求確認機制,來確保訊息的可靠傳輸。當發送訊息到 RabbitMQ 伺服器 之後,RabbitMQ 收到訊息之後,給發送回傳一個請求確認,表示RabbitMQ 伺服器已成功的接收到了訊息。

    設定application.yml

    spring:
      rabbitmq:
        # 消息确认机制 生产者 -> 交换机
        publisher-confirms: true
        # 消息返回机制  交换机 -> 队列
        publisher-returns: true

    設定

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【发送成功】");
                    } else {
                        log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息发送失败】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }

    #訊息從 生產者 到 交換器, 有confirmCallback 確認模式。發送訊息成功後訊息會呼叫方法confirm(CorrelationData correlationData, boolean ack, String cause),根據 ack 判斷訊息是否成功傳送。

    訊息從 交換器 到 佇列,有returnCallback 退回模式。

    傳送訊息 product message 控制台輸出如下:

    【傳送訊息】product message
    【接收訊息】product message 目前時間2022-05 -12 11:27:56
    【correlationData】:null
    【ack】true
    【cause】null
    【傳送成功】

    ##生產端類比訊息遺失

    這裡有兩個方案:

    • 發送訊息後立刻關閉broke,後者把網路關閉,但是broker關閉之後控制台一直就會報錯,發送訊息也報500錯誤。

    • 發送不存在的交換器:

    • // myExchange 修改成 myExchangexxxxx
      rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
    結果:

    【correlationData】:null

    # 【ack】false
    【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/xxx', class-id =60, method-id=40)
    【傳送失敗】

    當傳送失敗可以對訊息進行重試

    交換器正確,傳送不存在的佇列:

    交換器接收到訊息,回傳成功通知,控制台輸出:

    【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]

    【ack 】true
    【cause】null
    【傳送成功】

    交換器沒有找到佇列,回傳失敗訊息:

    ##【訊息傳送失敗】
    【message】product message

    【replyCode】312

    #RabbitMQ

    開啟佇列持久化,建立的佇列和交換器

    預設配置是持久化

    的。先把佇列和交換器設定正確,修改消費監聽的佇列,使得訊息存放在佇列裡修改佇列的持久化,修改成非持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }

    傳送訊息之後,訊息存放在佇列中,然後重啟 

    RabbitMQ

    ,訊息不存在了。 設定佇列持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }

    重啟之後,佇列的訊息還存在。

    消費端

    消費端預設開始 

    ack

     自動確認模式,當佇列訊息被消費者接收,不管有沒有被消費端訊息,都會自動刪除佇列中的消息。所以為了確保消費端能成功消費訊息,將自動模式改成手動確認模式:修改application.yml 檔案

    spring:
      rabbitmq:
        # 手动消息确认
        listener:
          simple:
            acknowledge-mode: manual

    消費接收訊息之後需要手動確認:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);
            System.out.println(message.getMessageProperties().getDeliveryTag());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    如果不新增:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    傳送兩個訊息

    訊息被接收後,沒有確認,重新放到佇列:

    怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸重啟項目,之後,佇列的訊息會傳送到消費者,但是沒有ack 確認,還是繼續放回佇列。

    加上 channel.basicAck 之後,再重啟項目

    怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸

    佇列訊息就被刪除了

    basicAck  方法最後一個參數 multiple 表示是刪除先前的佇列。

    multiple 設定為 true,把後面的佇列都清理掉了

    怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸

    以上是怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除