ホームページ  >  記事  >  Java  >  Java RabbitMQメッセージキューの一般的な問題と解決策の分析

Java RabbitMQメッセージキューの一般的な問題と解決策の分析

王林
王林転載
2023-04-23 09:49:062186ブラウズ

メッセージの蓄積

メッセージ蓄積の生成シナリオ:

  • プロデューサによって生成されるメッセージの速度は、消費者の消費速度よりも高速です。解決策: コンシューマーの数または速度を増やします。

  • 消費者がいない場合。解決策: デッドレターキュー、メッセージの有効期間を設定します。これは、メッセージに有効期間を設定するのと同じです。指定された時間内に消費がない場合、メッセージは自動的に期限切れになります。期限が切れると、クライアント コールバック監視メソッドが実行され、メッセージがデータベース テーブル レコードに保存されます。補償は後から実現します。

メッセージが失われないようにする

1. プロデューサは、メッセージ確認メカニズムを使用して、メッセージが MQ に正常に配信されることを確認します。

2. MQ サーバーはメッセージをハードディスクに保存する必要があります

3. コンシューマは手動 ACK メカニズムを使用して、メッセージの消費が成功したことを確認します

何をすべきかMQ サーバーの容量がいっぱいの場合はどうしますか?

デッドレター キューを使用してメッセージをデータベースに保存し、後で消費された分を補います。

デッドレター キュー

RabbitMQ デッドレター キューは一般にスペア タイヤ キューとして知られており、メッセージ ミドルウェアが何らかの理由でメッセージを拒否した後、保管のためにメッセージをデッドレター キューに転送できます。 、デッドレターキュー スイッチやルーティングキーなども存在する可能性があります。

生成の背景:
  • MQ に配信され、MQ に保存されたメッセージの有効期限が切れました
  • キューが最大値に達しました長さ (キュー コンテナがすでにいっぱいです) プロデューサーはメッセージの受信を拒否します
  • コンシューマーが複数のメッセージを消費できなかった場合、メッセージはデッド レター キューに転送されます

コード例:

maven 依存関係

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml 構成

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

キュー構成クラス

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

デッドレターキューコンシューマー

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

通常のキュー コンシューマ

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

バックグラウンド キュー管理ページは次のとおりです。

Java RabbitMQメッセージキューの一般的な問題と解決策の分析

デプロイ方法: デッド レター キューは同じディレクトリに存在できません。サーバーは通常のキューと同じなので分離する必要があります。サーバーストレージ。

遅延キュー

注文が 30 分間支払われない場合、システムが自動的にタイムアウトしてクローズする実装計画。

タスクのスケジューリングに基づくと、効率は非常に低くなります。

Redis の期限切れキーの実装に基づいて、キーの有効期限が切れるとメソッドがクライアントにコールバックされます。

ユーザーが注文すると、トークン (有効期間) が 30 分間生成され、redis に保存されます; 欠点: 非常に冗長であり、冗長フィールドがテーブルに保存されます。

MQ ベースの遅延キュー (最適なソリューション) RabbitMQ の状況。

原則: 注文を行うと、メッセージを mq に配信し、有効期間を 30 分に設定しますが、メッセージの有効期限が切れると (消費されずに)、クライアント上でメソッドを実行して、それを通知します。期限切れのメッセージが表示されるので、この時点で注文の支払いが完了しているかどうかを確認してください。

実装ロジック:

主にデッドレターキューを使用して実装します。

Java RabbitMQメッセージキューの一般的な問題と解決策の分析

望ましいコード: 通常のコンシューマはメッセージを消費しないか、通常のコンシューマが存在しません。設定された時間が経過すると、デッド レター キューに入り、その後終了します。コンシューマの実装対応するビジネスロジック。

RabbitMQ メッセージの冪等の問題

RabbitMQ メッセージの自動再試行メカニズム

コンシューマー ビジネス ロジック コードで例外がスローされると、再試行が自動的に実装されます (デフォルトは無数の再試行です)。 Try)

RabbitMQ の再試行回数の制限 (毎回 3 秒間隔で最大 5 回の再試行など) を実装する必要があります。再試行が複数回失敗した場合は、デッド レター キューに保存します。またはデータベース テーブルに保存し、後で労働報酬を記録します。何度も再試行が失敗すると、キューはメッセージを自動的に削除するためです。

メッセージの再試行原則: 再試行プロセス中に、aop を使用して消費リスニング メソッドをインターセプトします。このエラー ログは出力されません。複数回再試行しても失敗した場合、失敗の最大数に達した場合にのみエラー ログが出力されます。

複数回消費に失敗した場合:

1. メッセージを自動的に削除します (メッセージが失われる可能性があります)

解決策:

If Ifエンリッチメントが複数回失敗すると、最終的にはデッド レター キューに保存されます。

はテーブル ログを使用して消費失敗エラー ログを記録し、後で手動でメッセージを自動的に補正します。

再試行メカニズムの合理的な選択

コンシューマはメッセージを取得した後、サードパーティ インターフェイス (HTTP リクエスト) を呼び出しますが、サードパーティ インターフェイスの呼び出しに失敗しますか?もう一度試す必要がありますか?

回答: ネットワーク例外により呼び出しが失敗する場合があり、数回再試行する必要がある場合があります。

コンシューマーがメッセージを取得した後、コードの問題によりデータ例外がスローされます。再試行する必要がありますか?

回答: 再試行する必要はありません。コードが異常な場合は、コードリリースプロジェクトを再度修正する必要があります。

コンシューマーは手動 ack モードをオンにします

最初のステップとして、springboot プロジェクト構成は ack モードをオンにする必要があります

acknowledge-mode: Manual

第 2 ステップ、コンシューマー Java コード

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ はメッセージ冪等性の問題をどのように解決するか

メッセージ冪等性とは何ですか? MQ コンシューマはどのように冪等性を確保しますか? ###

理由: コンシューマが自動再試行をオンにする可能性があり、再試行プロセスによってコンシューマのビジネス ロジック コードが繰り返し実行される可能性があります。現時点では、メッセージは消費されています。ビジネス エラーによりメッセージが再び消費されるため、

解決策が表示されます: メッセージ グローバル ID を使用して、ビジネスに従って決定します。コンシューマはこれを判断できます。ビジネス ID (グローバル固有 ID) に基づくメッセージ。メッセージは消費されました。

コンシューマ コード ロジック:

Java RabbitMQメッセージキューの一般的な問題と解決策の分析

##RabbitMQ は分散トランザクションの問題を解決します

分散トランザクション: 分散システムでは、複数の異なるトランザクションが存在するため、サービスコールインタフェース内で動作し、各トランザクションは相互に影響を与えません。分散トランザクションには問題があります。

分散トランザクションを解決するための中心的な考え方: データの最終的な整合性。

分散フィールドの名詞:

強い一貫性: 同期速度が非常に速いか、ロック メカニズムがダーティ リードを許可しないかのいずれか;

強い一貫性ソリューション: いずれかのデータベースA がデータをデータ B に非常に迅速に同期するか、データベース A の同期が完了する前にデータベース B がデータを読み取ることができません。

弱い整合性: 読み取りが許可されるデータは元のダーティ データであり、読み取られた結果の不整合が許可されます。

最終的な整合性: 分散システムでは、データがネットワークを通じて同期的に通信されるため、短いデータ遅延は許容されますが、最終的なデータは一貫性がなければなりません。

RabbitMQ に基づいて分散トランザクションを解決するというアイデア

RabbitMQ に基づいて分散トランザクションを解決するというアイデア: (最終整合性ソリューションの採用)

    ##確認プロデューサー メッセージは MQ (メッセージ確認メカニズム) に配信される必要があります。配信が失敗した場合は、再試行を続けてください。
  • コンシューマーは手動確認を使用してメッセージを確認し、消費を達成します冪等性の問題に注意してください。消費が失敗すると、mq は自動的に消費者の再試行を支援します。
  • プロデューサの最初のトランザクションが最初に実行されることを確認します。実行が失敗した場合は、補足キューを使用します (プロデューサ自身のトランザクションを補足して、プロデューサの最初のトランザクションが確実に実行されるようにします [データは最終的に一貫した]セックス])。
  • 解決策マップ: 中心となるのは、mq を使用して他のシステムにメッセージを送信し、データを変更して戻すことです。

以上がJava RabbitMQメッセージキューの一般的な問題と解決策の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。