Fungsi MQ termasuk penyahgandingan, tak segerak, dsb.
Biasanya pengeluar hanya bertanggungjawab untuk menghasilkan mesej dan tidak peduli siapa yang menerima mesej atau hasil penggunaan pengguna hanya bertanggungjawab untuk menerima mesej tertentu untuk pemprosesan perniagaan dan tidak peduli dari mana mesej itu datang daripada dan membalas pemprosesan perniagaan pada Syarat tahap pertama. Tetapi terdapat perniagaan istimewa dalam projek kami Sebagai pengeluar mesej, kami perlu menerima hasil tindak balas pengguna selepas menghasilkan mesej (secara terang-terangan, ia serupa dengan penggunaan respons permintaan panggilan segerak Selepas). penyelidikan, mod Balasan MQ (model balasan langsung) telah dicipta untuk model perniagaan ini.
Pergantungan:
Saya hanya menyenaraikan yang teras di sini RabbitMq memerlukan kebergantungan
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
konfigurasi:
Tiada konfigurasi khas lain, kerana balasan hanyalah kaedah interaksi rabbitmq
spring: rabbitmq: host: 10.50.40.116 port: 5673 username: admin password: admin
package com.leilei.demo; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/ @Configuration public class RabbitMqConfig { @Bean public Queue bizQueue() { return new Queue("bizQueue"); } @Bean public Queue replyQueue() { return new Queue("replyQueue"); } @Bean FanoutExchange bizExchange() { return new FanoutExchange("bizExchange"); } }
Kelas perniagaan:
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable { private Integer id; private String name; }
Perkara yang perlu dilakukan oleh penamat pengeluaran mesej: perlu menghasilkan mesej , Terima respons penggunaan mesej
(1) Hasilkan mesej
1. Hasilkan mesej, bergantung pada pemilihan senario perniagaan sama ada untuk menjana secara global ID Mesej tersuai unik
2 Nyatakan baris gilir untuk respons selepas penggunaan mesej (Balas)
/** * 生产消息 * * @param * @return void * @author lei * @date 2022-09-19 21:59:18 */ public void replySend() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo("replyQueue"); //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID String correlationId = UUID.randomUUID().toString(); // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理 messageProperties.setCorrelationId(correlationId); Vehicle vehicle = new Vehicle(1, "川A0001"); Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties); rabbitTemplate.convertAndSend("bizExchange","",message); System.out.println("生产者发送消息,自定义消息ID为:" + correlationId); }
(2) Terima. Balasan balas
Selepas menggunakan mesej, pengguna akan menghantar hasil pemprosesan ke baris gilir Dengan membaca baris gilir ini, kita boleh mendapatkan hasil respons mesej yang sepadan untuk pemprosesan perniagaan
/** * 接收消息响应 * * @param message * @return void * @author lei * @date 2022-09-19 21:59:27 */ @RabbitListener(queues = "replyQueue") public void replyResponse(Message message) { String s = new String(message.getBody()); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("收到客户端响应消息ID:" + correlationId); //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作 System.out.println("收到客户端响应消息:" + s); }<.>(4) Pengguna mesej Pengguna mesej perlu melakukan perkara berikut: terima mesej dan kemudian laksanakan pemprosesan perniagaan dan balas mesej (1) Kaedah 1: hantar kepada anotasi + nilai pulangan kaedah Secara amnya, kaedah mendengar pengguna mq kami tidak memerlukan nilai pulangan Jika kami menggunakan anotasi sendTo di sini, kami perlu mentakrifkan mesej untuk dibalas sebagai nilai pulangan anotasi menentukan baris gilir mana yang hendak dibalas
Isi penting:
<.>
/** * 方式1 SendTo指定响应队列 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues ="bizQueue") @SendTo("replyQueue") public String handleEmailMessage(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客户端响应消息:"+msg+"处理完成!"; } catch (Exception e) { log.error("处理业务消息失败",e); } return null; }
/** * 方式2 message消息获取内部reply rabbitmq手动发送 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues = "bizQueue") public void handleEmailMessage2(Message message) { try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}", msg); String replyTo = message.getMessageProperties().getReplyTo(); System.out.println("接收到的reply:" + replyTo); rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> { x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId()); return x; }); } catch (Exception e) { log.error("处理业务消息失败",e); } }
Mesej penggunaan dan respons:
Maklum balas diterima:
Pautan:
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan springboot rabbitmq reply message mod direct reply. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!