Home >Java >javaTutorial >How to implement broadcast messages in RocketMQ in Springboot
There are two main types of RocketMQ message modes: broadcast mode and cluster mode (load balancing mode)
Broadcast mode is that every consumer will consume messages;
Load balancing mode is that every consumer A consumption will only be consumed once by a certain consumer;
We generally use the load balancing mode in our business. Of course, some special scenarios require the use of broadcast mode, such as sending a message to an email, mobile phone, or in-site prompt. ;
We can set it through the messageModel
attribute value of @RocketMQMessageListener
, MessageModel.BROADCASTING
is the broadcast mode, MessageModel.CLUSTERING
is the default cluster load balancing mode
Let’s introduce springboot rockermq integration to implement broadcast messages
Create a Springboot project and add rockermq dependency
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
Configure rocketmq
# Port
server:
port: 8083# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, which must be unique in an application
group : group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3
Production end: Create a new controller to send messages
The production end can send the message according to the normal sending logic
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 广播消息 * @author qzz */ @RestController public class RocketMQBroadCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送广播消息 */ @RequestMapping("/testBroadSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 for(int i=0;i<10;i++){ rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i); } } }
Create two consumers to consume the message
We first cluster load balancing test, add messageModel=MessageModel.CLUSTERING
Consumer 1:
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); } }
Consumer 2: In the same consumerGroup and topic as consumer 1
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); } }
Start the service and test cluster mode consumption
Cluster mode test: two consumers share the message equally
Change the messageModel attribute values of the above two consumers to broadcast mode
Consumer 1:
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); } }
Consumer 2: In the same consumerGroup and topic as Consumer 1
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); } }
Restart the service and test the broadcast mode consumption
The above is the detailed content of How to implement broadcast messages in RocketMQ in Springboot. For more information, please follow other related articles on the PHP Chinese website!