Maison >Java >javaDidacticiel >Comment RocketMQ implémente-t-il l'envoi et la réception de messages dans Springboot ?

Comment RocketMQ implémente-t-il l'envoi et la réception de messages dans Springboot ?

WBOY
WBOYavant
2023-05-18 17:19:061791parcourir

springboot+rockermq implémente l'envoi et la réception de messages simples

Il existe trois façons d'envoyer des messages ordinaires : l'envoi unidirectionnel, l'envoi synchrone et l'envoi asynchrone.

Présentons l'intégration springboot+rockermq pour réaliser l'envoi et la réception de messages ordinaires

  • Créez un projet Springboot et ajoutez une dépendance rockermq#🎜🎜 # #🎜🎜 ## r#r#🎜🎜 ## 🎜🎜 ## 🎜🎜#Configurer Rocketmq#🎜🎜 ## 🎜🎜 ## 🎜🎜 ## 🎜🎜 ## 端

    Serveur :#🎜🎜 # port : 8083
# Configurer rocketmq
    rocketmq:
  • serveur de noms : 127.0.0.1:9876

    #producteur

    producteur 🎜# #Quand le message atteint 4096 octets, le message sera compressé. Par défaut 4096
  • compress-message-body-threshold : 4096
#Limite maximale de messages, la valeur par défaut est 128K
max-message-size : 4194304

#Réessayer si l'envoi du message de synchronisation échoue Fois
retry-times-when-send-failed : 3
#S'il faut réessayer d'autres agents en cas d'échec de l'envoi interne, ce paramètre ne prendra effet que lorsqu'il y a plusieurs courtiers

retry-next-server : true

#
retry-times-when-send-async-failed: 3



# 🎜🎜#Créer un nouveau contrôleur à envoyer messages :


<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

SpringBoot nous fournit la classe de modèle RocketMQTemplate, que nous pouvons utiliser pour envoyer des messages sous diverses formes.

La méthode d'envoi précise le sujet sujet test-sujet.



Créez un nouveau consommateur de messages pour écouter RocketMQConsumerListener, écouter les messages et consommer des messages


package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 普通信息的三种方式:同步、异步、单向
 * @author qzz
 */
@RestController
public class RocketMQCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送普通消息
     * convertAndSend(String destination, Object payload) 发送字符串比较方便
     */
    @RequestMapping("/send")
    public void send(){
        rocketMQTemplate.convertAndSend("test-topic","test-message");
    }
    /**
     * 发送同步消息
     */
    @RequestMapping("/testSyncSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
        System.out.println(sendResult);
    }
    /**
     * 发送异步消息
     */
    @RequestMapping("/testASyncSend")
    public void testASyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:回调
        rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送异常");
                throwable.printStackTrace();
            }
        });
    }
    /**
     * 发送单向消息
     */
    @RequestMapping("/testOneWay")
    public void testOneWay(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
    }
}

Classe de consommateur à implémenter l'interface RocketMQListener et à spécifier dynamiquement le type de message String.
    Ajoutez une @RocketMQMessageListener annotation à la classe, spécifiez le sujet sujet test-topic et le test du groupe de consommateurs
  • L'envoi et la réception de messages simples La construction est complet!

Démarrer le service et tester la consommation des messages

  • #🎜🎜 #

Test du message synchrone :

RocketMQListener接口,以及动态指定消息类型String。

类上要加上@RocketMQMessageListener注解

Test du message asynchrone :
  • # 🎜 🎜#

    Test message à sens unique :

Comment RocketMQ implémente-t-il lenvoi et la réception de messages dans Springboot ?

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer