Home >Java >javaTutorial >How to implement broadcast messages in RocketMQ in Springboot

How to implement broadcast messages in RocketMQ in Springboot

PHPz
PHPzforward
2023-05-11 20:13:161193browse

There are two main types of RocketMQ message modes: broadcast mode and cluster mode (load balancing mode)

Broadcast mode is that every consumer will consume messages;

Load balancing mode is that every consumer A consumption will only be consumed once by a certain consumer;

We generally use the load balancing mode in our business. Of course, some special scenarios require the use of broadcast mode, such as sending a message to an email, mobile phone, or in-site prompt. ;

We can set it through the messageModel attribute value of @RocketMQMessageListener, MessageModel.BROADCASTING is the broadcast mode, MessageModel.CLUSTERING is the default cluster load balancing mode

Let’s introduce springboot rockermq integration to implement broadcast messages

  • Create a Springboot project and add rockermq dependency

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • Configure rocketmq

# Port
server:
port: 8083

# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, which must be unique in an application
group : group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3

  • Production end: Create a new controller to send messages

The production end can send the message according to the normal sending logic

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
  • Create two consumers to consume the message

We first cluster load balancing test, add messageModel=MessageModel.CLUSTERING

Consumer 1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

Consumer 2: In the same consumerGroup and topic as consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}
  • Start the service and test cluster mode consumption

Cluster mode test: two consumers share the message equally

How to implement broadcast messages in RocketMQ in Springboot

  • Change the messageModel attribute values ​​of the above two consumers to broadcast mode

Consumer 1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息1 广播模式,消费消息:"+s);
    }
}

Consumer 2: In the same consumerGroup and topic as Consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息2 广播模式,消费消息:"+s);
    }
}
  • Restart the service and test the broadcast mode consumption

How to implement broadcast messages in RocketMQ in Springboot

#

The above is the detailed content of How to implement broadcast messages in RocketMQ in Springboot. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete