メッセージが失われないようにする方法
rabbitmq メッセージ配信パス
Producer->Switch-> Queue->Consumer
一般に、これは 3 つの段階に分かれています。
1. プロデューサは、メッセージ配信の信頼性を保証します。
2.mq 内部メッセージは失われません。
3. 消費者消費は成功しています。
メッセージ配信の信頼性とは
簡単に言うと、メッセージは 100% メッセージ キューに送信されます。
confirmCallback をオンにすることができます
プロデューサーがメッセージを配信した後、mq はプロデューサーに確認応答を返します。確認応答に基づいて、プロデューサーはメッセージが mq に送信されたかどうかを確認できます。
##confirmCallback を開く構成ファイルを変更します#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlatedテスト コード
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }returnCallback を使用して、メッセージがエクスチェンジャからキューに正常に送信されたことを確認します。構成ファイルを変更します
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: trueテスト コード
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }コンシューマがメッセージを消費するとき、ack を通じてメッセージが消費されたことを手動で確認する必要があります。 構成ファイルを変更する
spring: rabbitmq: listener: simple: acknowledge-mode: manualテスト コードを作成する
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }deliveryTags はメッセージ配信シーケンス番号です。メッセージが消費されるか、メッセージが再配信されるたびに、 deliveryTag が増加しますttlデッドレターキュー
##デッドレターキューとは
##期限内に消費されないメッセージが保存されるキュー##何メッセージがデッドレターになる状況
消費者がメッセージを拒否
(basic.reject/basic.nack)- し、再度キューに入れない
- requeue= false
#メッセージはキュー内で消費されておらず、キューまたはメッセージ自体の有効期限を超えていますTTL (存続時間)
キューのメッセージの長さが制限に達しました
- 結果: メッセージがデッドレターになった後、キューがバインドされている場合デッド レター スイッチに送信すると、メッセージはデッド レター スイッチによってデッド レター キューに再ルーティングされます。
- デッド レター キューは、遅延キューの消費によく使用されます。 遅延キュー
package com.fandf.test.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单交换机
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 订单队列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 订单路由key
*/
public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
/**
* 死信交换机
*/
public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
/**
* 死信队列 routingKey
*/
public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
/**
* 死信队列
*/
public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
/**
* 创建死信交换机
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
}
/**
* 创建死信队列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列
*/
@Bean("orderDeadLetterBinding")
public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
}
/**
* 创建订单交换机
*/
@Bean("orderExchange")
public Exchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE, true, false);
}
/**
* 创建订单队列
*/
@Bean("orderQueue")
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
/**
* 绑定订单交换机和队列
*/
@Bean("orderBinding")
public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
}
}
Consumerpackage com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }テストクラス
@Test
void testOrder() throws InterruptedException {
//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
rabbitTemplate.setMandatory(true);
//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code=" + code);
System.out.println("returned=" + returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");
System.out.println("发送消息:" + DateUtil.now());
Thread.sleep(20000);
}
プログラム出力
メッセージ送信: 2023-04-16 15:14:34
メッセージ受信: 2023-04-16 15:14:44msgTag=1
message=(本文: 'テスト注文の遅延' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1,exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023、routing-keys=[order]、queue=order_queue}]、x-first-death-reason=expired、x-first-death-queue=order_queue}、contentType=text/plain、contentEncoding = UTF-8、contentLength=0、receiveddeliveryMode=PERSISTENT、priority=0、redelivered=false、receivedExchange=order_dead_letter_exchange、receivedRoutingKey=order_dead_letter_queue_routing_key、deliveryTag=1、consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ、consumerQueue=order_dead_letter_queue] )体= テスト注文の遅延
以上がSpringBoot が RabbitMQ を統合して遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

Dreamweaver Mac版
ビジュアル Web 開発ツール

メモ帳++7.3.1
使いやすく無料のコードエディター

mPDF
mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

Safe Exam Browser
Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。

SAP NetWeaver Server Adapter for Eclipse
Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。
