Home  >  Article  >  Java  >  How does RocketMQ implement message sending and receiving in Springboot?

How does RocketMQ implement message sending and receiving in Springboot?

WBOY
WBOYforward
2023-05-18 17:19:061735browse

springboot rockermq implements simple message sending and receiving

There are three ways to send ordinary messages: one-way sending, synchronous sending and asynchronous sending.

Let’s introduce springboot rockermq integration to realize the sending and receiving of ordinary messages

  • Create a Springboot project and add rockermq dependency

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • Configure rocketmq

# Port
server:
port: 8083

# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, it must be unique in an application
group: group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3

  • Create a new controller to send messages:

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 普通信息的三种方式:同步、异步、单向
 * @author qzz
 */
@RestController
public class RocketMQCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送普通消息
     * convertAndSend(String destination, Object payload) 发送字符串比较方便
     */
    @RequestMapping("/send")
    public void send(){
        rocketMQTemplate.convertAndSend("test-topic","test-message");
    }
    /**
     * 发送同步消息
     */
    @RequestMapping("/testSyncSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
        System.out.println(sendResult);
    }
    /**
     * 发送异步消息
     */
    @RequestMapping("/testASyncSend")
    public void testASyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:回调
        rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送异常");
                throwable.printStackTrace();
            }
        });
    }
    /**
     * 发送单向消息
     */
    @RequestMapping("/testOneWay")
    public void testOneWay(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
    }
}

SpringBoot provides us with the RocketMQTemplate template class, which we can use to send messages in various forms.

The sending method specifies the Topic topic test-topic.

  • Create a new message consumer to listen to RocketMQConsumerListener, listen to messages, and consume messages

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 消费消息
 * 配置RocketMQ监听
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test",topic = "test-topic")
public class RocketMQConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:"+s);
    }
}

The consumer class must implement the RocketMQListener interface , and dynamically specify the message type String.

The @RocketMQMessageListener annotation should be added to the class, specify the topic topic test-topic, and the consumer group test

Simple message sending and receiving is completed!

  • Start the service and test message consumption

How does RocketMQ implement message sending and receiving in Springboot?

How does RocketMQ implement message sending and receiving in Springboot?

Test synchronization Message:

How does RocketMQ implement message sending and receiving in Springboot?

Test asynchronous message:

How does RocketMQ implement message sending and receiving in Springboot?

## Test one-way message:

How does RocketMQ implement message sending and receiving in Springboot?

The above is the detailed content of How does RocketMQ implement message sending and receiving in Springboot?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete