ホームページ  >  記事  >  Java  >  Springboot の RocketMQ でブロードキャスト メッセージを実装する方法

Springboot の RocketMQ でブロードキャスト メッセージを実装する方法

PHPz
PHPz転載
2023-05-11 20:13:161117ブラウズ

RocketMQ メッセージ モードには主に 2 つのタイプがあります: ブロードキャスト モードとクラスター モード (負荷分散モード)

ブロードキャスト モードでは、すべてのコンシューマがメッセージを消費します。

負荷分散モードでは、すべてのコンシューマ 消費は特定のコンシューマによって 1 回だけ消費されます;

私たちは通常、ビジネスで負荷分散モードを使用します。もちろん、一部の特殊なシナリオでは、メッセージを送信するなど、ブロードキャスト モードの使用が必要になります。電子メール、携帯電話、またはサイト内プロンプト。;

これは、@RocketMQMessageListenerMessageModel.BROADCASTING の messageModel 属性値を通じて設定できます。 はブロードキャスト モードです。 MessageModel.CLUSTERING はデフォルトのクラスター負荷分散モードです。

ブロードキャスト メッセージを実装するために springboot rockermq 統合を導入しましょう

  • #Springboot プロジェクトを作成し、rockermq の依存関係を追加します

  • <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
  • Configure rocketmq

Port
サーバー:
ポート: 8083

# rocketmq の構成
rocketmq:
ネームサーバー: 127.0.0.1:9876
#プロデューサー
プロデューサー:
#プロデューサー グループ名前 (アプリケーション内で一意である必要があります)
group : group1
#メッセージ送信のデフォルトのタイムアウトは 3000ms です
send-message-timeout: 3000
#メッセージが 4096 バイトに達すると、メッセージは圧縮されます。デフォルト 4096
compress-message-body-threshold: 4096
#最大メッセージ制限、デフォルトは 128K
max-message-size: 4194304
#失敗した同期メッセージ送信の再試行回数
retry-times-when-send-failed: 3
#内部送信が失敗したときに他のエージェントを再試行するかどうか。このパラメータは複数のブローカーがある場合にのみ有効です
retry-next-server: true
#失敗した非同期メッセージ送信の再試行回数
retry-times-when-send-async-failed: 3

  • 生産終了: メッセージを送信する新しいコントローラーの作成

本番側は通常の送信ロジックに従ってメッセージを送信できます

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
  • #メッセージを消費する 2 つのコンシューマを作成します

最初にクラスター負荷分散テストを行い、messageModel=MessageModel.CLUSTERING

コンシューマー 1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

コンシューマー 2: コンシューマー 1 と同じ ConsumerGroup およびトピックに追加します

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}

    #サービスを開始してクラスター モードの消費をテストします
  • ##クラスター モードのテスト: 2 つのコンシューマーがメッセージを均等に共有します

#上記 2 つのコンシューマの messageModel 属性値をブロードキャスト モードに変更します

Springboot の RocketMQ でブロードキャスト メッセージを実装する方法

##コンシューマ 1:
    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
    public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("广播消息1 广播模式,消费消息:"+s);
        }
    }
  • コンシューマ 2 : Consumer 1 と同じ ConsumerGroup およびトピック内で

    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("广播消息2 广播模式,消费消息:"+s);
        }
    }

  • #サービスを再起動し、ブロードキャスト モードの消費をテストします

以上がSpringboot の RocketMQ でブロードキャスト メッセージを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。