ホームページ >Java >&#&チュートリアル >SpringBoot が RabbitMQ を統合して遅延キューを実装する方法
rabbitmq メッセージ配信パス
Producer->Switch-> Queue->Consumer
一般に、これは 3 つの段階に分かれています。
1. プロデューサは、メッセージ配信の信頼性を保証します。
2.mq 内部メッセージは失われません。
3. 消費者消費は成功しています。
簡単に言うと、メッセージは 100% メッセージ キューに送信されます。
confirmCallback をオンにすることができます
プロデューサーがメッセージを配信した後、mq はプロデューサーに確認応答を返します。確認応答に基づいて、プロデューサーはメッセージが mq に送信されたかどうかを確認できます。
##confirmCallback を開く構成ファイルを変更します#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlatedテスト コード
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }returnCallback を使用して、メッセージがエクスチェンジャからキューに正常に送信されたことを確認します。構成ファイルを変更します
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: trueテスト コード
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }コンシューマがメッセージを消費するとき、ack を通じてメッセージが消費されたことを手動で確認する必要があります。 構成ファイルを変更する
spring: rabbitmq: listener: simple: acknowledge-mode: manualテスト コードを作成する
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }deliveryTags はメッセージ配信シーケンス番号です。メッセージが消費されるか、メッセージが再配信されるたびに、 deliveryTag が増加しますttlデッドレターキュー
#メッセージはキュー内で消費されておらず、キューまたはメッセージ自体の有効期限を超えていますTTL (存続時間)
キューのメッセージの長さが制限に達しました
package com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 订单队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 订单路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交换机 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信队列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信队列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 创建死信交换机 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 创建死信队列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 绑定死信交换机和死信队列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 创建订单交换机 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 创建订单队列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 绑定订单交换机和队列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }テストクラス
@Test void testOrder() throws InterruptedException { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟"); System.out.println("发送消息:" + DateUtil.now()); Thread.sleep(20000); }
メッセージ送信: 2023-04-16 15:14:34
メッセージ受信: 2023-04-16 15:14:44msgTag=1
message=(本文: 'テスト注文の遅延' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1,exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023、routing-keys=[order]、queue=order_queue}]、x-first-death-reason=expired、x-first-death-queue=order_queue}、contentType=text/plain、contentEncoding = UTF-8、contentLength=0、receiveddeliveryMode=PERSISTENT、priority=0、redelivered=false、receivedExchange=order_dead_letter_exchange、receivedRoutingKey=order_dead_letter_queue_routing_key、deliveryTag=1、consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ、consumerQueue=order_dead_letter_queue] )体= テスト注文の遅延
以上がSpringBoot が RabbitMQ を統合して遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。