メッセージ蓄積の生成シナリオ:
プロデューサによって生成されるメッセージの速度は、消費者の消費速度よりも高速です。解決策: コンシューマーの数または速度を増やします。
消費者がいない場合。解決策: デッドレターキュー、メッセージの有効期間を設定します。これは、メッセージに有効期間を設定するのと同じです。指定された時間内に消費がない場合、メッセージは自動的に期限切れになります。期限が切れると、クライアント コールバック監視メソッドが実行され、メッセージがデータベース テーブル レコードに保存されます。補償は後から実現します。
<dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>yml 構成
server: # 服务启动端口配置 port: 8081 servlet: # 应用访问路径 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####连接地址 host: www.kaicostudy.com ####端口号 port: 5672 ####账号 username: kaico ####密码 password: kaico ### 地址 virtual-host: /kaicoStudy ###模拟演示死信队列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###备胎交换机 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.orderキュー構成クラス
@Configuration public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 绑定死信队列到死信交换机 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 声明订单队列,并且绑定死信队列 * * @return Queue */ @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }デッドレターキューコンシューマー
@Component public class OrderDlxConsumer { /** * 死信队列监听队列回调的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信队列消费订单消息" + msg); } }通常のキュー コンシューマ
@Component public class OrderConsumer { /** * 监听队列回调的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常订单消费者消息msg:" + msg); } }バックグラウンド キュー管理ページは次のとおりです。 デプロイ方法: デッド レター キューは同じディレクトリに存在できません。サーバーは通常のキューと同じなので分離する必要があります。サーバーストレージ。 遅延キュー 注文が 30 分間支払われない場合、システムが自動的にタイムアウトしてクローズする実装計画。 タスクのスケジューリングに基づくと、効率は非常に低くなります。 Redis の期限切れキーの実装に基づいて、キーの有効期限が切れるとメソッドがクライアントにコールバックされます。 ユーザーが注文すると、トークン (有効期間) が 30 分間生成され、redis に保存されます; 欠点: 非常に冗長であり、冗長フィールドがテーブルに保存されます。 MQ ベースの遅延キュー (最適なソリューション) RabbitMQ の状況。 原則: 注文を行うと、メッセージを mq に配信し、有効期間を 30 分に設定しますが、メッセージの有効期限が切れると (消費されずに)、クライアント上でメソッドを実行して、それを通知します。期限切れのメッセージが表示されるので、この時点で注文の支払いが完了しているかどうかを確認してください。 実装ロジック: 主にデッドレターキューを使用して実装します。 望ましいコード: 通常のコンシューマはメッセージを消費しないか、通常のコンシューマが存在しません。設定された時間が経過すると、デッド レター キューに入り、その後終了します。コンシューマの実装対応するビジネスロジック。 RabbitMQ メッセージの冪等の問題RabbitMQ メッセージの自動再試行メカニズムコンシューマー ビジネス ロジック コードで例外がスローされると、再試行が自動的に実装されます (デフォルトは無数の再試行です)。 Try) RabbitMQ の再試行回数の制限 (毎回 3 秒間隔で最大 5 回の再試行など) を実装する必要があります。再試行が複数回失敗した場合は、デッド レター キューに保存します。またはデータベース テーブルに保存し、後で労働報酬を記録します。何度も再試行が失敗すると、キューはメッセージを自動的に削除するためです。 メッセージの再試行原則: 再試行プロセス中に、aop を使用して消費リスニング メソッドをインターセプトします。このエラー ログは出力されません。複数回再試行しても失敗した場合、失敗の最大数に達した場合にのみエラー ログが出力されます。 複数回消費に失敗した場合: 1. メッセージを自動的に削除します (メッセージが失われる可能性があります) 解決策: If Ifエンリッチメントが複数回失敗すると、最終的にはデッド レター キューに保存されます。 はテーブル ログを使用して消費失敗エラー ログを記録し、後で手動でメッセージを自動的に補正します。 再試行メカニズムの合理的な選択コンシューマはメッセージを取得した後、サードパーティ インターフェイス (HTTP リクエスト) を呼び出しますが、サードパーティ インターフェイスの呼び出しに失敗しますか?もう一度試す必要がありますか? 回答: ネットワーク例外により呼び出しが失敗する場合があり、数回再試行する必要がある場合があります。 コンシューマーがメッセージを取得した後、コードの問題によりデータ例外がスローされます。再試行する必要がありますか? 回答: 再試行する必要はありません。コードが異常な場合は、コードリリースプロジェクトを再度修正する必要があります。 コンシューマーは手動 ack モードをオンにします最初のステップとして、springboot プロジェクト構成は ack モードをオンにする必要があります
acknowledge-mode: Manual第 2 ステップ、コンシューマー Java コード
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
理由: コンシューマが自動再試行をオンにする可能性があり、再試行プロセスによってコンシューマのビジネス ロジック コードが繰り返し実行される可能性があります。現時点では、メッセージは消費されています。ビジネス エラーによりメッセージが再び消費されるため、
解決策が表示されます: メッセージ グローバル ID を使用して、ビジネスに従って決定します。コンシューマはこれを判断できます。ビジネス ID (グローバル固有 ID) に基づくメッセージ。メッセージは消費されました。
コンシューマ コード ロジック:
##RabbitMQ は分散トランザクションの問題を解決します分散トランザクション: 分散システムでは、複数の異なるトランザクションが存在するため、サービスコールインタフェース内で動作し、各トランザクションは相互に影響を与えません。分散トランザクションには問題があります。 分散トランザクションを解決するための中心的な考え方: データの最終的な整合性。 分散フィールドの名詞: 強い一貫性: 同期速度が非常に速いか、ロック メカニズムがダーティ リードを許可しないかのいずれか; 強い一貫性ソリューション: いずれかのデータベースA がデータをデータ B に非常に迅速に同期するか、データベース A の同期が完了する前にデータベース B がデータを読み取ることができません。 弱い整合性: 読み取りが許可されるデータは元のダーティ データであり、読み取られた結果の不整合が許可されます。 最終的な整合性: 分散システムでは、データがネットワークを通じて同期的に通信されるため、短いデータ遅延は許容されますが、最終的なデータは一貫性がなければなりません。 RabbitMQ に基づいて分散トランザクションを解決するというアイデアRabbitMQ に基づいて分散トランザクションを解決するというアイデア: (最終整合性ソリューションの採用)以上がJava RabbitMQメッセージキューの一般的な問題と解決策の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。