ホームページ  >  記事  >  Java  >  SpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法

SpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法

王林
王林転載
2023-05-29 22:34:491883ブラウズ

    環境構成

    SpringBoot Integration RabbitMQ はメッセージの送信を実現します。

    1. maven 依存関係を追加します

           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2. application.yml 設定ファイルを追加します

    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx

    3. スイッチ、キュー、バインディングを設定します

        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }

    4. プロダクションはメッセージを送信します

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【发送消息】" + message)
            return "【send message】" + message;
        }

    5. コンシューマはメッセージを受信します

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);

    6. プロダクション側を呼び出してメッセージを送信します hello、コンソール出力:

    #[メッセージ送信] hello

    [メッセージ受信] hello 現在時刻 2022-05-12 10:21:14

    メッセージが正常に受信されたことを示します。

    メッセージ損失分析

    SpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法

    メッセージの生成から消費まで、次の段階でメッセージ損失が発生する可能性があります:

      #プロダクション側で失われた: プロデューサーは
    • RabbitMQ

    • ストレージ側で失われた:
    • RabbitMQ

      ストレージ自体はdown

      #消費者側で紛失: ネットワークの問題により、ストレージを消費者側に送信できないか、消費がハングして通常の消費を送信できません
    • RabbitMQ
    プロダクション側、ストレージ側、コンシューマー側からの信頼性の高い送信を適切にサポートします。

    実稼働フェーズ

    実稼働フェーズでは、

    リクエスト確認メカニズム

    を使用して、メッセージの信頼性の高い送信を保証します。 RabbitMQ サーバーにメッセージを送信した後、RabbitMQ はメッセージを受信し、RabbitMQ サーバーがメッセージを正常に受信したことを示す要求確認を送信者に返します。

    Configuration application.yml

    spring:
      rabbitmq:
        # 消息确认机制 生产者 -> 交换机
        publisher-confirms: true
        # 消息返回机制  交换机 -> 队列
        publisher-returns: true

    Configuration

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【发送成功】");
                    } else {
                        log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息发送失败】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }

    Producer

    から

    switch へのメッセージ、confirmCallback# があります## モードを確認します。メッセージが正常に送信された後、メッセージはメソッド confirm(CorrelationData correlationData, boolean ack, String Cause) を呼び出し、ack に基づいてメッセージが正常に送信されたかどうかを判断します。 switch から queue へのメッセージには

    returnCallback

    リターン モードがあります。 メッセージの送信 製品メッセージ コンソール出力は次のとおりです:

    [メッセージの送信]製品メッセージ

    [メッセージの受信]製品メッセージ 現在時刻 2022 -05 -12 11:27:56[correlationData]:null[ack]true

    [cause]null
    [送信成功]



    生産終了シミュレーション メッセージが失われます

    ここには 2 つの解決策があります:

    メッセージを送信した直後にブローカーを閉じます。後者はネットワークをシャットダウンしますが、ブローカーが閉じられた後です。 、コンソールは常にエラーを報告し、メッセージを送信します。また、500 エラーも報告します。

      存在しないスイッチを送信:
    • // myExchange 修改成 myExchangexxxxx
      rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
    • 結果:
    • [correlationData]:null
    • 【ack】false
    【原因】チャネル エラー; プロトコル メソッド: #method(reply-code=404, Reply-text=NOT_FOUND - 仮想ホスト '/' に交換 'myExchangexxxxx' がありません、クラス ID =60、method-id=40)

    [送信に失敗しました]

    送信が失敗した場合は、メッセージを再試行できます

    スイッチは正しく、キューは送信に失敗しました。存在しないというメッセージが送信されます:

    スイッチはメッセージを受信し、成功通知を返します。コンソール出力:

    [correlationData]:CorrelationData [id=7d468b47-b422-4523-b2a2] -06b14aef073c]

    [ack ]true

    [cause]null

    [送信成功]

    スイッチはキューを見つけられず、障害情報を返しました:


    [メッセージの送信に失敗しました]

    [メッセージ]製品メッセージ
    [返信コード]312

    RabbitMQ


    キューの永続性を有効にし、キューとスイッチを作成します
    デフォルト設定は永続性です

    。まず、キューとスイッチを正しく設定し、
    メッセージがキュー

    に格納されるように消費監視用のキューを変更します。

    キューの永続性を非永続性に変更します:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }
    メッセージの送信後、メッセージはキューに保存され、メッセージの RabbitMQ が再起動されます。もはや存在しない。

    キューの永続性を設定する:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }

    再起動後も、キュー内のメッセージはまだ存在します。

    コンシューマ エンド
    コンシューマ エンドはデフォルトで開始されます

    ack

    自動確認モードキュー メッセージがコンシューマによって受信されると、メッセージは関係なくキューから自動的に削除されます消費者側からのメッセージがあるかどうかのニュース。したがって、コンシューマーがメッセージを正常に消費できることを確認するには、自動モードを手動確認モードに変更します。

    application.yml ファイルを変更します

    spring:
      rabbitmq:
        # 手动消息确认
        listener:
          simple:
            acknowledge-mode: manual

    メッセージを消費して受信した後、手動による確認が必要です: <pre class="brush:java;">channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);</pre><pre class="brush:java;"> @RabbitListener(queuesToDeclare = @Queue(&quot;myQueue&quot;)) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat(&quot;yyyy-MM-dd HH:mm:ss&quot;); Date date = new Date(); String time = sdf.format(date); System.out.println(&quot;【接收信息】&quot; + msg + &quot; 当前时间&quot; + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }</pre>追加しない場合:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    2 つのメッセージを送信します

    メッセージを受信した後、確認は行われず、キューに戻されます:

    プロジェクトを再起動します。その後、キュー内のメッセージはコンシューマに送信されますが、ACK 確認がなければ、キューに戻され続けます。 。

    channel.basicAck を追加した後、プロジェクトを再起動します

    SpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法

    キュー メッセージは削除されます

    basicAck メソッド multiple の最後のパラメータは、前のキューを削除することを意味します。

    multipletrue に設定され、後続のキューはすべてクリアされます

    SpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法

    以上がSpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。