MQ の機能には、デカップリング、非同期などが含まれます。
通常、プロデューサーはメッセージを作成することのみを担当し、誰がメッセージを取得するか、または消費結果が何であるかは気にしません。コンシューマーは、ビジネス処理のために指定されたメッセージを受信することのみを担当し、メッセージがどこに送信されるかは気にしません。 from、第 1 レベルの応答ビジネス処理条件。しかし、私たちのプロジェクトには特別なビジネスがあり、メッセージプロデューサーとして、メッセージを生成した後にコンシューマーの応答結果を受け取る必要があります (率直に言うと、これは同期呼び出し要求応答を MQ で使用するのと似ています)。調査によると、MQ の返信モード (直接返信モデル) は、このビジネス モデル用に作成されました。
Dependency:
ここではコアのみをリストします。 RabbitMq に必要な依存関係
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
構成:
応答は Rabbitmq
spring: rabbitmq: host: 10.50.40.116 port: 5673 username: admin password: admin
package com.leilei.demo; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/ @Configuration public class RabbitMqConfig { @Bean public Queue bizQueue() { return new Queue("bizQueue"); } @Bean public Queue replyQueue() { return new Queue("replyQueue"); } @Bean FanoutExchange bizExchange() { return new FanoutExchange("bizExchange"); } }
ビジネスクラス:
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable { private Integer id; private String name; }
メッセージ生成側で行うべきこと: メッセージを生成する、メッセージ消費応答の受け入れ
(1) メッセージの生成
1. ビジネス シナリオに応じて、メッセージを生成し、グローバルに一意のカスタム メッセージ メッセージ ID
2. メッセージ消費後の応答のキューを指定します (返信)
/** * 生产消息 * * @param * @return void * @author lei * @date 2022-09-19 21:59:18 */ public void replySend() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo("replyQueue"); //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID String correlationId = UUID.randomUUID().toString(); // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理 messageProperties.setCorrelationId(correlationId); Vehicle vehicle = new Vehicle(1, "川A0001"); Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties); rabbitTemplate.convertAndSend("bizExchange","",message); System.out.println("生产者发送消息,自定义消息ID为:" + correlationId); }
(2) Accept Reply 応答
コンシューマーがメッセージを消費した後、処理結果がキューに送信されます。ここでキューを読み取ることで、業務処理用の該当メッセージの応答結果を取得できます
/** * 接收消息响应 * * @param message * @return void * @author lei * @date 2022-09-19 21:59:27 */ @RabbitListener(queues = "replyQueue") public void replyResponse(Message message) { String s = new String(message.getBody()); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("收到客户端响应消息ID:" + correlationId); //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作 System.out.println("收到客户端响应消息:" + s); }
メッセージ利用者側が行うべきことは、メッセージを受け入れ、業務処理を実行し、メッセージに応答することです
通常、mq コンシューマ リスニング メソッドは値を返す必要はありませんが、ここで sendTo アノテーションを使用する場合は、戻り値として応答するメッセージを定義する必要があります。 sendTo アノテーションは、どのキューに応答するかを指定します。
重要なポイント:
1. sendTo アノテーションは、対応するキューを指定します (注意してください)。運用終了と一致します)
2. メソッド定義 戻り値の内容は応答するメッセージであり、最終的に sendTo アノテーションで指定された対応するキューに送信されます。
3. この方法の欠点は、sendTo で指定されたターゲット キューが盲目的に書き込まれ、プロデューサーがメッセージ応答の受信に失敗する可能性があるため、コンシューマ側が非常に集中していることです。正しくは正しいですが、これは一般的なプロジェクトでは行われないと思います
/** * 方式1 SendTo指定响应队列 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues ="bizQueue") @SendTo("replyQueue") public String handleEmailMessage(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客户端响应消息:"+msg+"处理完成!"; } catch (Exception e) { log.error("处理业务消息失败",e); } return null; }
/** * 方式2 message消息获取内部reply rabbitmq手动发送 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues = "bizQueue") public void handleEmailMessage2(Message message) { try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}", msg); String replyTo = message.getMessageProperties().getReplyTo(); System.out.println("接收到的reply:" + replyTo); rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> { x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId()); return x; }); } catch (Exception e) { log.error("处理业务消息失败",e); } }
/** * 方式三 方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理) * * @param message * @return String * @author lei * @date 2022-09-19 23:17:47 */ @RabbitListener(queues ="bizQueue") public String handleEmailMessage3(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客户端响应消息:"+msg+"处理完成!"; } catch (Exception e) { log.error("处理业务消息失败",e); } return null; }
(4) テスト
生産メッセージ:
#消費メッセージと応答:
受信した応答:
##リンク:
以上がspringboot Rabbitmq 応答メッセージの直接応答モードを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。