Maison >Java >javaDidacticiel >Comment implémenter des messages diffusés dans RocketMQ dans Springboot
Il existe deux modes de message principaux de RocketMQ : le mode de diffusion et le mode cluster (mode d'équilibrage de charge).
Le mode de diffusion signifie que chaque consommateur consommera les messages.
Le mode d'équilibrage de charge signifie que chaque consommation ne sera consommée qu'une seule fois par un certain consommateur ;
Nous utilisons généralement le mode d'équilibrage de charge dans notre entreprise. Bien sûr, certains scénarios spéciaux nécessitent l'utilisation du mode de diffusion, comme l'envoi d'un message à un e-mail, à un téléphone portable ou à une invite sur site
Nous pouvons le transmettre ; est le mode d'équilibrage de charge du cluster par défaut@RocketMQMessageListener
的messageModel
属性值来设置,MessageModel.BROADCASTING
是广播模式,MessageModel.CLUSTERING
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
# Portserveur :
# Configuration rocketmq
port : 8083rocketmq:
name-server : 127.0.0.1:9876
#Producteur
producteur:
#Nom du groupe producteur, qui doit être unique dans une application
groupe : groupe1
#Le délai d'attente par défaut pour l'envoi du message, c'est 3000 ms
send -message-timeout: 3000
#Lorsque le message atteint 4096 octets, le message sera compressé. La valeur par défaut est 4096
compress-message-body-threshold : 4096
#Limite maximale des messages, la valeur par défaut est 128K
max-message-size : 4194304
#Nombre de tentatives pour l'envoi d'un message de synchronisation ayant échoué
nombre de tentatives en cas d'échec de l'envoi : 3
#S'il faut réessayer d'autres agents en cas d'échec de l'envoi interne, ce paramètre ne prend effet que lorsqu'il y a plusieurs courtiers
retry-next-server: true
#Le nombre de tentatives lorsque l'envoi de message asynchrone échoue retry-times-when-send -async-failed : 3
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 广播消息 * @author qzz */ @RestController public class RocketMQBroadCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送广播消息 */ @RequestMapping("/testBroadSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 for(int i=0;i<10;i++){ rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i); } } }
Consumer 1 :
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); } }
Consumer 2 : Dans le même consumerGroup et le même sujet que Consumer 1
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); } }
Consommateur 2 : Dans le même groupe de consommateurs et même sujet que le consommateur 1
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!