ホームページ >Java >&#&チュートリアル >Springboot の RocketMQ でブロードキャスト メッセージを実装する方法
RocketMQ メッセージ モードには主に 2 つのタイプがあります: ブロードキャスト モードとクラスター モード (負荷分散モード)
ブロードキャスト モードでは、すべてのコンシューマがメッセージを消費します。
負荷分散モードでは、すべてのコンシューマ 消費は特定のコンシューマによって 1 回だけ消費されます;
私たちは通常、ビジネスで負荷分散モードを使用します。もちろん、一部の特殊なシナリオでは、メッセージを送信するなど、ブロードキャスト モードの使用が必要になります。電子メール、携帯電話、またはサイト内プロンプト。;
これは、@RocketMQMessageListener
、MessageModel.BROADCASTING の
messageModel 属性値を通じて設定できます。
はブロードキャスト モードです。 MessageModel.CLUSTERING
はデフォルトのクラスター負荷分散モードです。
ブロードキャスト メッセージを実装するために springboot rockermq 統合を導入しましょう
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
Port
サーバー:
ポート: 8083# rocketmq の構成
rocketmq:
ネームサーバー: 127.0.0.1:9876
#プロデューサー
プロデューサー:
#プロデューサー グループ名前 (アプリケーション内で一意である必要があります)
group : group1
#メッセージ送信のデフォルトのタイムアウトは 3000ms です
send-message-timeout: 3000
#メッセージが 4096 バイトに達すると、メッセージは圧縮されます。デフォルト 4096
compress-message-body-threshold: 4096
#最大メッセージ制限、デフォルトは 128K
max-message-size: 4194304
#失敗した同期メッセージ送信の再試行回数
retry-times-when-send-failed: 3
#内部送信が失敗したときに他のエージェントを再試行するかどうか。このパラメータは複数のブローカーがある場合にのみ有効です
retry-next-server: true
#失敗した非同期メッセージ送信の再試行回数
retry-times-when-send-async-failed: 3
生産終了: メッセージを送信する新しいコントローラーの作成
本番側は通常の送信ロジックに従ってメッセージを送信できます
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 广播消息 * @author qzz */ @RestController public class RocketMQBroadCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送广播消息 */ @RequestMapping("/testBroadSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 for(int i=0;i<10;i++){ rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i); } } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); } }コンシューマー 2: コンシューマー 1 と同じ ConsumerGroup およびトピックに追加します
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); } }
以上がSpringboot の RocketMQ でブロードキャスト メッセージを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。