SpringBoot
Integrasi RabbitMQ
untuk merealisasikan penghantaran mesej.
1. Tambahkan maven
pergantungan
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2 Tambah fail konfigurasi application.yml
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3 Konfigurasikan suis, baris gilir dan pengikatan
rreee4. Penerbit menghantar mesej
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
5 Pengguna menerima mesej
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }
6 Panggil penamat pengeluaran untuk menghantar mesej hello
, dan output konsol:
[Hantar mesej] hello
[Terima mesej] hello Masa semasa 2022-05-12 10:21:14
Menunjukkan bahawa mesej telah berjaya diterima.
Dari pengeluaran hingga penggunaan mesej, kehilangan mesej mungkin berlaku dalam peringkat berikut:
Hilang di bahagian pengeluaran: Pengeluar tidak boleh menghantar ke RabbitMQ
Hilang di bahagian storan: RabbitMQ
Storan itu sendiri tidak berfungsi
Hilang pengguna: Disebabkan masalah rangkaian, storan tidak boleh dihantar ke hujung pengguna, atau penggunaan ditutup dan penggunaan biasa tidak boleh dihantar
RabbitMQ
Dari hujung pengeluaran, hujung storan dan hujung pengguna Menyediakan sokongan yang baik untuk penghantaran yang boleh dipercayai.
Fasa pengeluaran menggunakan mekanisme pengesahan permintaan untuk memastikan penghantaran mesej yang boleh dipercayai. Selepas menghantar mesej kepada pelayan RabbitMQ, RabbitMQ menerima mesej dan mengembalikan pengesahan permintaan kepada pengirim, menunjukkan bahawa pelayan RabbitMQ telah berjaya menerima mesej tersebut.
Aplikasi konfigurasi.yml
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);
Konfigurasi
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: true
Mesej daripada Pengeluar kepada Tukar, dengan mod pengesahan confirmCallback
. Selepas mesej berjaya dihantar, mesej akan memanggil kaedah confirm(CorrelationData correlationData, boolean ack, String cause)
dan menentukan sama ada mesej berjaya dihantar berdasarkan ack
.
Mesej pergi daripada tukar kepada baris gilir, dengan returnCallback
mod undur.
Hantar mesej product message
Output konsol adalah seperti berikut:
【Hantar mesej】mesej produk
【Terima mesej】mesej produk Masa semasa 2022-05-12 11 :27 :56
[correlationData]:null
[ack]true
[cause]null
[hantar berjaya]
Terdapat dua penyelesaian di sini:
Tutup broker serta-merta selepas menghantar mesej yang terakhir menutup rangkaian, tetapi selepas broker ditutup, konsol akan sentiasa melaporkan ralat, dan ralat 500 akan dilaporkan semasa menghantar mesej.
Menghantar suis yang tidak wujud:
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
Keputusan:
[correlationData]:null
【ack】false
【sebab】kaedah protokol saluran: #method(reply-code=404, reply-text=NOT_FOUND - tiada pertukaran 'myExchangexxxxx' dalam vhost '/', class-id =60, method-id=40)
[Gagal menghantar]
Apabila penghantaran gagal, anda boleh mencuba semula mesej
Suis adalah betul dan baris gilir yang melakukannya not exist dihantar:
Suis menerima mesej dan mengembalikan pemberitahuan kejayaan:
[correlationData]:CorrelationData [id=7d468b47-b422-4523-b2a2. -06b14aef073c]
[ack 】true
[cause]null
[Berjaya dihantar]
Suis tidak menemui baris gilir dan mengembalikan maklumat kegagalan:
[Penghantaran mesej gagal]
[mesej]mesej produk
[replyCode]312
Dayakan kegigihan baris gilir, buat baris gilir dan suisKonfigurasi lalai ialah kegigihan daripada. Mula-mula, tetapkan baris gilir dan tukar dengan betul, dan ubah suai baris gilir untuk pemantauan penggunaan supaya mesej disimpan dalam baris gilir .
Ubah suai kegigihan baris gilir kepada tidak berterusan:
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
Selepas menghantar mesej, mesej disimpan dalam baris gilir, dan kemudian dimulakan semula RabbitMQ
, mesej itu tidak wujud lagi.
Tetapkan kegigihan baris gilir:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
Selepas dimulakan semula, mesej dalam baris gilir masih wujud.
Pihak pengguna memulakan mod pengesahan automatik secara lalai ack
Apabila mesej baris gilir diterima oleh pengguna, mesej dalam baris gilir akan dipadamkan secara automatik tanpa mengira sama ada terdapat ialah sebarang mesej daripada pihak pengguna. Oleh itu, untuk memastikan pengguna berjaya menggunakan mesej tersebut, tukar mod automatik kepada mod pengesahan manual:
Ubah suai fail application.yml
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
Selepas menggunakan dan menerima mesej, pengesahan manual diperlukan:
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manualrrree
Jika tidak ditambah:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
Hantar dua mesej
Selepas mesej diterima, tiada pengesahan dan dimasukkan semula ke dalam baris gilir:
Mulakan semula projek Selepas itu, mesej dalam baris gilir akan dihantar kepada pengguna, tetapi tanpa pengesahan ack, mereka akan terus dimasukkan semula ke dalam baris gilir.
Selepas menambah channel.basicAck
, mulakan semula projek
Mesej baris gilir akan dipadamkan
basicAck
Parameter terakhir kaedah multiple
Menunjukkan bahawa baris gilir sebelumnya dipadamkan.
multiple
ditetapkan kepada true
dan baris gilir di belakang dikosongkan
Atas ialah kandungan terperinci Cara menggunakan SpringBoot+RabbitMQ untuk mencapai penghantaran mesej yang boleh dipercayai. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!