Maison >Java >javaDidacticiel >Comment implémenter des messages diffusés dans RocketMQ dans Springboot

Comment implémenter des messages diffusés dans RocketMQ dans Springboot

PHPz
PHPzavant
2023-05-11 20:13:161187parcourir

Il existe deux modes de message principaux de RocketMQ : le mode de diffusion et le mode cluster (mode d'équilibrage de charge).

Le mode de diffusion signifie que chaque consommateur consommera les messages.

Le mode d'équilibrage de charge signifie que chaque consommation ne sera consommée qu'une seule fois par un certain consommateur ;

Nous utilisons généralement le mode d'équilibrage de charge dans notre entreprise. Bien sûr, certains scénarios spéciaux nécessitent l'utilisation du mode de diffusion, comme l'envoi d'un message à un e-mail, à un téléphone portable ou à une invite sur site

Nous pouvons le transmettre ; est le mode d'équilibrage de charge du cluster par défaut@RocketMQMessageListenermessageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING

Présentons l'intégration springboot + rockermq pour implémenter les messages de diffusion

  • Créez un projet Springboot et ajoutez une dépendance rockermq

  • <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
  • Configurer rocketmq

# Port

serveur :
port : 8083

# Configuration rocketmq

rocketmq:
name-server : 127.0.0.1:9876
#Producteur
producteur:
#Nom du groupe producteur, qui doit être unique dans une application
groupe : groupe1
#Le délai d'attente par défaut pour l'envoi du message, c'est 3000 ms
send -message-timeout: 3000
#Lorsque le message atteint 4096 octets, le message sera compressé. La valeur par défaut est 4096
compress-message-body-threshold : 4096
#Limite maximale des messages, la valeur par défaut est 128K
max-message-size : 4194304
#Nombre de tentatives pour l'envoi d'un message de synchronisation ayant échoué
nombre de tentatives en cas d'échec de l'envoi : 3
#S'il faut réessayer d'autres agents en cas d'échec de l'envoi interne, ce paramètre ne prend effet que lorsqu'il y a plusieurs courtiers
retry-next-server: true
#Le nombre de tentatives lorsque l'envoi de message asynchrone échoue retry-times-when-send -async-failed : 3

    Côté production : créer un nouveau contrôleur pour envoyer des messages
  • Le côté production peut envoyer des messages selon la logique d'envoi normale
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}

    Créer deux consommateurs pour consommer des messages
  • Nous effectuons d'abord un test d'équilibrage de charge du cluster, ajoutons messageModel=MessageModel.CLUSTERING

Consumer 1 :

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

Consumer 2 : Dans le même consumerGroup et le même sujet que Consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}

    Démarrez le service et testez le mode cluster Consommation
  • Test en mode cluster : deux consommateurs partagent le message de manière égale

Comment implémenter des messages diffusés dans RocketMQ dans Springboot

    Modifiez la valeur de l'attribut messageModel des deux consommateurs ci-dessus en mode diffusion
  • Consommateur 1 :
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息1 广播模式,消费消息:"+s);
    }
}

Consommateur 2 : Dans le même groupe de consommateurs et même sujet que le consommateur 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息2 广播模式,消费消息:"+s);
    }
}

    Redémarrez le service et testez la consommation en mode diffusion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer