首頁  >  文章  >  Java  >  springboot rabbitmq reply訊息直接回覆模式怎麼實現

springboot rabbitmq reply訊息直接回覆模式怎麼實現

王林
王林轉載
2023-05-11 11:58:141438瀏覽

一、使用場景

MQ的作用包括了解耦、非同步等。

通常生產者只負責生產訊息,而不關心訊息誰去獲取,或消費結果如何;消費者只負責接收指定的訊息進行業務處理而不關心訊息從哪裡來一級回覆業務處理情況。但我們專案中有特殊的業務存在,我們作為消息生產者在生產消息後需要接收消費者的響應結果(說白了就是類似同步調用請求響應的MQ使用),經過研究,MQ的Reply模式(直接回复模式)就是為此種業務模式而產生。

二、Reply實戰

(1)依賴與YML配置

依賴:

##我這裡只列出最核心的rabbitMq所需依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

無其餘特殊配置,因為reply就是rabbitmq的一種互動方式而已

spring:
  rabbitmq:
    host: 10.50.40.116
    port: 5673
    username: admin
    password: admin

(2 )RabbitMq bean設定

package com.leilei.demo;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author lei
 * @create 2022-09-19 21:44
 * @desc mq配置
 **/
@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue bizQueue() {
        return new Queue("bizQueue");
    }
    @Bean
    public Queue replyQueue() {
        return new Queue("replyQueue");
    }
    @Bean
    FanoutExchange bizExchange() {
        return new FanoutExchange("bizExchange");
    }
}

業務類別:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {
    private Integer id;
    private String name;
}

(3)訊息生產端

訊息生產端需要做的事情:有生產訊息、接受訊息消費回應

(1)生產訊息

  • #1、生產訊息,看業務場景選擇是否產生全域唯一自訂的訊息ID

  • 2、指定訊息消費後回應的佇列(Reply)

  •     /**
         * 生产消息
         *
         * @param
         * @return void
         * @author lei
         * @date 2022-09-19 21:59:18
         */
        public void replySend() {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setReplyTo("replyQueue");
            //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID
            String correlationId = UUID.randomUUID().toString();
            // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理
            messageProperties.setCorrelationId(correlationId);
            Vehicle vehicle = new Vehicle(1, "川A0001");
            Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("bizExchange","",message);
            System.out.println("生产者发送消息,自定义消息ID为:" + correlationId);
        }

(2)接受Reply回應

消費者消費訊息後會將處理結果傳送到一個佇列,我們讀取這裡佇列就可以拿到對應訊息的回應結果進行業務處理了

    /**
     * 接收消息响应
     *
     * @param message
     * @return void
     * @author lei
     * @date 2022-09-19 21:59:27
     */
    @RabbitListener(queues = "replyQueue")
    public void replyResponse(Message message) {
        String s = new String(message.getBody());
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("收到客户端响应消息ID:" + correlationId);
        //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作
        System.out.println("收到客户端响应消息:" + s);
    }

(4)訊息消費端

訊息消費端需要做的事有:接受訊息然後進行業務處理、回應訊息

(1)方法一:sendTo註解方法傳回值
一般來說,我們mq消費者監聽方法不需要回傳值,我們這裡使用sendTo註解,則需要將要回應的訊息定義為回傳值,sendTo註解中指定要回應到哪個佇列

重點:

  • 1、sendTo註解指定要對應的佇列(注意和生產端保持一致)

  • 2、方法定義的回傳值內容就是要回應的訊息,最後會傳送到sendTo註解指定要對應的佇列

  • 3、這個方法的缺點是消費端的主關性很高,因為sendTo指定的目標佇列可以自己瞎寫,導致生產者端無法正確收到訊息回應,但我相信一般專案中也不會這麼幹

  •     /**
         * 方式1   SendTo指定响应队列
         *
         * @param message
         * @return String
         * @author lei
         * @date 2022-09-19 16:17:52
         */
        @RabbitListener(queues ="bizQueue")
        @SendTo("replyQueue")
        public String handleEmailMessage(Message message) {
            try {
                String msg=new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("---consumer接收到消息----{}",msg);
                return "客户端响应消息:"+msg+"处理完成!";
            } catch (Exception e) {
                log.error("处理业务消息失败",e);
            }
            return null;
        }
(2)方法二:讀取生產端的訊息使用範本發送
與普通的消費者方法一樣,只需要RabbitListener註解監聽業務佇列;但還需要根據訊息取得ReplyTo位址,然後自己消費者方法內部手動傳送訊息

  • 1、優點,更強烈的感受到訊息請求回應的互動性,流程看起來更清晰

  • ##2、缺點,程式碼不雅
  •     /**
         * 方式2  message消息获取内部reply rabbitmq手动发送
         *
         * @param message
         * @return String
         * @author lei
         * @date 2022-09-19 16:17:52
         */
        @RabbitListener(queues = "bizQueue")
        public void handleEmailMessage2(Message message) {
            try {
                String msg = new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("---consumer接收到消息----{}", msg);
                String replyTo = message.getMessageProperties().getReplyTo();
                System.out.println("接收到的reply:" + replyTo);
                rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> {
                    x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
                    return x;
                });
            } catch (Exception e) {
                log.error("处理业务消息失败",e);
            }
        }
  • (3)方法三:方法傳回值
這種方式與1其實是一致的,但我經過測試,因為生產者訊息指定了ReplyTo的地址,消費者端無需自己再次手動指定,即生產消息到哪裡,是否響應以及響應消息發送到哪裡全由生產端自己空,消費者只需要處理自身業務以及返回結果
   /**
     * 方式三  方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理)
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 23:17:47
     */
    @RabbitListener(queues ="bizQueue")
    public String handleEmailMessage3(Message message) {
        try {
            String msg=new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            return "客户端响应消息:"+msg+"处理完成!";
        }
        catch (Exception e) {
            log.error("处理业务消息失败",e);
        }
        return null;
    }

(4)測試

生產訊息:

springboot rabbitmq reply消息直接回复模式怎么实现

消費訊息與回應:

springboot rabbitmq reply消息直接回复模式怎么实现

springboot rabbitmq reply消息直接回复模式怎么实现

springboot rabbitmq reply消息直接回复模式怎么实现

############## ###############收到的回應:#####################連結:####### ##########

以上是springboot rabbitmq reply訊息直接回覆模式怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除