Heim >Java >javaLernprogramm >Wie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?

Wie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?

WBOY
WBOYnach vorne
2023-05-18 17:19:061770Durchsuche

springboot+rockermq realisiert das einfache Senden und Empfangen von Nachrichten

Es gibt drei Möglichkeiten, normale Nachrichten zu senden: Einwegversand, synchrones Senden und asynchrones Senden.

Im Folgenden wird die Integration von Springboot + RockerMQ vorgestellt, um das Senden und Empfangen normaler Nachrichten zu realisieren.

  • Erstellen Sie ein Springboot-Projekt und fügen Sie RockerMQ-Abhängigkeiten hinzu

    Server :
  • Port: 8083
    # Rocketmq konfigurieren
  • rocketmq:

    Nameserver: 127.0.0.1:9876

    #Produzent
  • Produzent:
#Produzentengruppenname, der in einer Anwendung eindeutig sein muss
Gruppe: Gruppe1

#Timeout für Nachrichtenversand Die Standardzeit beträgt 3000 ms
send-message-timeout: 3000
#Wenn die Nachricht 4096 Bytes erreicht, wird die Nachricht komprimiert. Standard 4096

compress-message-body-threshold: 4096

#Maximales Nachrichtenlimit, Standard ist 128K
max-message-size: 4194304
#Anzahl der Wiederholungsversuche für fehlgeschlagenes Senden der Synchronisierungsnachricht
retry-times-when-send-failed: 3
#Ob andere Agenten erneut versucht werden sollen, wenn das interne Senden fehlschlägt. Dieser Parameter ist nur wirksam, wenn mehrere Broker vorhanden sind. async-failed: 3




Erstellen Sie einen neuen Controller zum Senden von Nachrichten:


<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

SpringBoot stellt uns die Vorlagenklasse RocketMQTemplate zur Verfügung, mit der wir Nachrichten in verschiedenen Formen senden können.

Die Sendemethode gibt das Thema Thema Test-Thema an.


Erstellen Sie einen neuen Nachrichtenkonsumenten, um RocketMQConsumerListener abzuhören, auf Nachrichten zu warten und Nachrichten zu konsumieren
  • package com.example.springbootrocketdemo.controller;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 普通信息的三种方式:同步、异步、单向
     * @author qzz
     */
    @RestController
    public class RocketMQCOntroller {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 发送普通消息
         * convertAndSend(String destination, Object payload) 发送字符串比较方便
         */
        @RequestMapping("/send")
        public void send(){
            rocketMQTemplate.convertAndSend("test-topic","test-message");
        }
        /**
         * 发送同步消息
         */
        @RequestMapping("/testSyncSend")
        public void testSyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
            System.out.println(sendResult);
        }
        /**
         * 发送异步消息
         */
        @RequestMapping("/testASyncSend")
        public void testASyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            //参数三:回调
            rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("消息发送异常");
                    throwable.printStackTrace();
                }
            });
        }
        /**
         * 发送单向消息
         */
        @RequestMapping("/testOneWay")
        public void testOneWay(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
        }
    }

    Die Konsumentenklasse muss die RocketMQListener-Schnittstelle implementieren und den Nachrichtentyp String dynamisch angeben.

  • Fügen Sie der Klasse die @RocketMQMessageListener-Annotation hinzu, geben Sie das Thema Topic Test-Topic und den Verbrauchergruppentest an

Das einfache Senden und Empfangen von Nachrichten ist abgeschlossen!

    Starten Sie den Dienst und testen Sie den Nachrichtenverbrauch Nachrichten:

Das obige ist der detaillierte Inhalt vonWie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen