Heim  >  Artikel  >  Java  >  So implementieren Sie Broadcast-Nachrichten in RocketMQ in Springboot

So implementieren Sie Broadcast-Nachrichten in RocketMQ in Springboot

PHPz
PHPznach vorne
2023-05-11 20:13:161119Durchsuche

Es gibt zwei Haupttypen von RocketMQ-Nachrichtenmodi: Broadcast-Modus und Cluster-Modus (Lastausgleichsmodus)

Der Broadcast-Modus bedeutet, dass jeder Verbraucher Nachrichten konsumiert; Der Lastausgleichsmodus bedeutet, dass jeder Verbrauch nur einmal von einem bestimmten Verbraucher verbraucht wird.

Wir verwenden in unserem Unternehmen im Allgemeinen den Lastausgleichsmodus. Natürlich erfordern einige spezielle Szenarien die Verwendung des Broadcast-Modus , wie zum Beispiel das Senden einer Nachricht. Gehen Sie zu Ihrer E-Mail-Adresse, Ihrem Mobiltelefon und Ihrer Site-Eingabeaufforderung Führen Sie die Springboot+RockerMQ-Integration ein, um Broadcast-Nachrichten zu implementieren 🎜# 🎜#

# Port@RocketMQMessageListenermessageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERINGServer:

Port: 8083

# Rocketmq konfigurieren
    rocketmq:
  • Nameserver: 127.0.0.1:9876

    #producer

    Producer:
  • #Der Name der Produzentengruppe muss innerhalb einer Anwendung eindeutig sein
Gruppe: Gruppe1 # 🎜🎜# #Das Standard-Timeout für das Senden von Nachrichten beträgt 3000 ms
    send-message-timeout: 3000
  • #Wenn die Nachricht 4096 Bytes erreicht, wird die Nachricht komprimiert. Standard 4096

    compress-message-body-threshold: 4096

    #Maximales Nachrichtenlimit, Standard ist 128K
  • max-message-size: 4194304
#Wiederholen, wenn das Senden der Synchronisierungsnachricht fehlschlägt
retry-times-when-send-failed: 3

#Ob andere Agenten erneut versucht werden sollen, wenn das interne Senden fehlschlägt, dieser Parameter ist nur wirksam, wenn mehrere Broker vorhanden sind
retry-next-server: true
#

retry-times-when-send-async-failed: 3




# 🎜🎜#Produktionsseite: Erstellen Sie einen neuen Controller um Nachrichten zu senden



Die Produktionsseite kann Nachrichten gemäß der normalen Sendelogik senden

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

#🎜 🎜#
Erstellen Sie zwei Verbraucher sollen Nachrichten konsumieren



Wir testen zuerst den Lastausgleich im Cluster und fügen messageModel=MessageModel.CLUSTERING
#🎜 🎜#Verbraucher 1:
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
# hinzu 🎜🎜#Verbraucher 2: In derselben Verbrauchergruppe und demselben Thema
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

Starten Sie den Dienst und testen Sie den Cluster-Modus-Verbrauch
  • # 🎜🎜#Cluster-Modus-Test: Zwei Verbraucher teilen die Nachricht gleichermaßen über zwei Verbraucher in den Broadcast-Modus

    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("集群模式 消费者2,消费消息:"+s);
        }
    }

Starten Sie den Dienst neu und testen Sie den Broadcast-Modus-Verbrauch

#🎜🎜 #

Das obige ist der detaillierte Inhalt vonSo implementieren Sie Broadcast-Nachrichten in RocketMQ in Springboot. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen