springboot rockermq implements simple message sending and receiving
There are three ways to send ordinary messages: one-way sending, synchronous sending and asynchronous sending.
Let’s introduce springboot rockermq integration to realize the sending and receiving of ordinary messages
Create a Springboot project and add rockermq dependency
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
Configure rocketmq
# Port
server:
port: 8083# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, it must be unique in an application
group: group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3
Create a new controller to send messages:
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 普通信息的三种方式:同步、异步、单向 * @author qzz */ @RestController public class RocketMQCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 * convertAndSend(String destination, Object payload) 发送字符串比较方便 */ @RequestMapping("/send") public void send(){ rocketMQTemplate.convertAndSend("test-topic","test-message"); } /** * 发送同步消息 */ @RequestMapping("/testSyncSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试"); System.out.println(sendResult); } /** * 发送异步消息 */ @RequestMapping("/testASyncSend") public void testASyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 //参数三:回调 rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println("消息发送异常"); throwable.printStackTrace(); } }); } /** * 发送单向消息 */ @RequestMapping("/testOneWay") public void testOneWay(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 rocketMQTemplate.sendOneWay("test-topic","单向消息测试"); } }
SpringBoot provides us with the RocketMQTemplate template class, which we can use to send messages in various forms.
The sending method specifies the Topic topic test-topic.
Create a new message consumer to listen to RocketMQConsumerListener, listen to messages, and consume messages
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 消费消息 * 配置RocketMQ监听 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test",topic = "test-topic") public class RocketMQConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费消息:"+s); } }
The consumer class must implement the RocketMQListener
interface , and dynamically specify the message type String.
The @RocketMQMessageListener annotation
should be added to the class, specify the topic topic test-topic, and the consumer group test
Simple message sending and receiving is completed!
Start the service and test message consumption
Test synchronization Message:
Test asynchronous message:
## Test one-way message:
The above is the detailed content of How does RocketMQ implement message sending and receiving in Springboot?. For more information, please follow other related articles on the PHP Chinese website!