Heim  >  Artikel  >  Java  >  So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen

So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen

王林
王林nach vorne
2023-05-29 22:34:491830Durchsuche

    Umgebungskonfiguration

    SpringBoot Integration RabbitMQ zur Implementierung von Messaging schicken. SpringBoot 整合 RabbitMQ 实现消息的发送。

    1.添加 maven 依赖

           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.添加 application.yml 配置文件

    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx

    3.配置交换机、队列以及绑定

        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }

    4.生产发送消息

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【发送消息】" + message)
            return "【send message】" + message;
        }

    5.消费者接收消息

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);

    6.调用生产端发送消息 hello,控制台输出:

    【发送消息】hello
    【接收信息】hello 当前时间2022-05-12 10:21:14

    说明消息已经被成功接收。

    消息丢失分析

    So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen

    一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

    • 生产端丢失: 生产者无法传输到 RabbitMQ

    • 存储端丢失: RabbitMQ 存储自身挂了

    • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

    RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

    生产阶段

    生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

    配置application.yml

    spring:
      rabbitmq:
        # 消息确认机制 生产者 -> 交换机
        publisher-confirms: true
        # 消息返回机制  交换机 -> 队列
        publisher-returns: true

    配置

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【发送成功】");
                    } else {
                        log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息发送失败】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }

    消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

    消息从 交换机 到 队列,有returnCallback 退回模式。

    发送消息 product message 控制台输出如下:

    【发送消息】product message
    【接收信息】product message 当前时间2022-05-12 11:27:56
    【correlationData】:null
    【ack】true
    【cause】null
    【发送成功】

    生产端模拟消息丢失

    这里有两个方案:

    • 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。

    • 发送不存在的交换机:

    // myExchange 修改成 myExchangexxxxx
    rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

    结果:

    【correlationData】:null
    【ack】false
    【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
    【发送失败】

    当发送失败可以对消息进行重试

    交换机正确,发送不存在的队列:

    交换机接收到消息,返回成功通知,控制台输出:

    【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
    【ack】true
    【cause】null
    【发送成功】

    交换机没有找到队列,返回失败信息:

    【消息发送失败】
    【message】product message
    【replyCode】312

    RabbitMQ

    开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

    修改队列的持久化,修改成非持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }

    发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
    设置队列持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }

    重启之后,队列的消息还存在。

    消费端

    消费端默认开始 ack

    1. maven-Abhängigkeit hinzufügen

    spring:
      rabbitmq:
        # 手动消息确认
        listener:
          simple:
            acknowledge-mode: manual

    2. Application.yml-Konfigurationsdatei hinzufügen

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    3 Schalter, Warteschlange und Bindung

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);
            System.out.println(message.getMessageProperties().getDeliveryTag());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    4. Die Produktion sendet Nachrichten

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    5. Verbraucher erhalten Nachrichten

    rrreee

    6 Nachrichten hello, Konsolenausgabe:

    So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen[Nachricht senden]hello
    [Nachricht empfangen]hello Aktuelle Zeit 2022-05-12 10:21:14

    Zeigt an, dass die Nachricht erfolgreich empfangen wurde.

    #🎜🎜#Nachrichtenverlustanalyse#🎜🎜##🎜🎜#„So#🎜🎜##🎜🎜# Von der Produktion bis zum Verbrauch einer Nachricht kann es in den folgenden Phasen zu Nachrichtenverlusten kommen: #🎜🎜#
    • #🎜🎜#Auf der Produktionsseite verloren: Der Produzent kann nicht an RabbitMQ#🎜🎜#
    • #🎜🎜#Auf der Produktionsseite verloren gehen Speicherseite: RabbitMQ Der Speicher selbst ist ausgefallen#🎜🎜#
    • #🎜🎜#Auf Verbraucherseite verloren: Der Speicher kann aufgrund von Netzwerkproblemen nicht an den Verbraucher gesendet werden. oder der Verbrauch ist gesunken und der normale Verbrauch kann nicht gesendet werden# 🎜🎜#
    • #🎜🎜##🎜🎜#RabbitMQ bietet gute Unterstützung für eine zuverlässige Übertragung von der Produktionsseite, der Speicherseite usw Verbraucherseite. #🎜🎜##🎜🎜#Produktionsphase#🎜🎜##🎜🎜#Die Produktionsphase nutzt den Anfragebestätigungsmechanismus, um eine zuverlässige Übertragung von Nachrichten sicherzustellen. Nach dem Senden einer Nachricht an den RabbitMQ-Server empfängt RabbitMQ die Nachricht und sendet eine Anforderungsbestätigung an den Absender zurück, die angibt, dass der RabbitMQ-Server die Nachricht erfolgreich empfangen hat. #🎜🎜##🎜🎜#Configuration application.yml#🎜🎜#rrreee#🎜🎜#Configuration#🎜🎜#rrreee#🎜🎜#Nachrichten vom Produzenten an Switch , es gibt den Bestätigungsmodus confirmCallback. Nachdem die Nachricht erfolgreich gesendet wurde, ruft die Nachricht die Methode confirm(CorrelationData KorrelationData, boolean ack, String Cause) auf und bestimmt anhand von ack, ob die Nachricht erfolgreich gesendet wurde . #🎜🎜##🎜🎜#Nachrichten von switch an queue haben den returnCallback-Rückgabemodus. #🎜🎜##🎜🎜#Nachricht senden Produktnachricht Die Konsolenausgabe lautet wie folgt: #🎜🎜#
      #🎜🎜#【Nachricht senden】Produktnachricht
      【Nachricht empfangen 】Produktnachricht Aktuelle Zeit 2022-05-12 11:27:56
      [correlationData]:null
      [ack]true
      [cause]null
      [erfolgreich gesendet]#🎜 🎜#

      Simulierter Nachrichtenverlust auf der Produktionsseite

      #🎜🎜#Es gibt zwei Lösungen: #🎜🎜#
      • #🎜 🎜#Senden Schließen Sie den Broker sofort nach der Nachricht, wodurch das Netzwerk heruntergefahren wird. Nach dem Schließen des Brokers meldet die Konsole jedoch immer einen Fehler und beim Senden einer Nachricht wird ein 500-Fehler gemeldet. #🎜🎜#
      • #🎜🎜#Nicht vorhandenen Schalter senden: #🎜🎜#
      • #🎜🎜#rrreee#🎜🎜#Ergebnis: #🎜🎜#
        #🎜🎜 #[correlationData]:null
        [ack]false
        [cause]Kanalfehler; Protokollmethode: #method(reply-code=404, Reply-text=NOT_FOUND – kein Austausch ' myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
        [Senden fehlgeschlagen]#🎜🎜#
        #🎜🎜#Sie können die Nachricht erneut versuchen, wenn das Senden fehlschlägt# 🎜 🎜##🎜🎜#Der Switch ist korrekt und die nicht vorhandene Warteschlange wird gesendet: #🎜🎜##🎜🎜#Der Switch empfängt die Nachricht und gibt eine Erfolgsbenachrichtigung zurück: #🎜🎜#
        # 🎜🎜#[correlationData 】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
        [ack]true
        [cause]null
        [Erfolgreich gesendet]#🎜🎜##🎜 🎜#Der Switch hat die Warteschlange nicht gefunden und Fehlerinformationen zurückgegeben: #🎜🎜#
        #🎜🎜#[Nachrichtenversand fehlgeschlagen]
        [message]Produktnachricht
        [replyCode ]312#🎜🎜# blockquote>#🎜🎜#RabbitMQ#🎜🎜##🎜🎜#Die erstellten Warteschlangen und Switches sind standardmäßig als persistent konfiguriert. Stellen Sie zunächst die Warteschlange und den Schalter richtig ein und ändern Sie die Warteschlange für die Verbrauchsüberwachung so, dass Nachrichten in der Warteschlange gespeichert werden. #🎜🎜##🎜🎜#Ändern Sie die Persistenz der Warteschlange in Nichtpersistenz: #🎜🎜#rrreee#🎜🎜#Nach dem Senden der Nachricht wird die Nachricht in der Warteschlange gespeichert und dann RabbitMQ neu gestartet code>, Die Nachricht existiert nicht mehr. <br>Warteschlangenpersistenz festlegen: #🎜🎜#rrreee#🎜🎜# Nach dem Neustart sind die Nachrichten in der Warteschlange weiterhin vorhanden. #🎜🎜##🎜🎜#Verbraucherseite#🎜🎜##🎜🎜#Die Verbraucherseite startet standardmäßig den automatischen Bestätigungsmodus <code>ack, wenn die Warteschlangennachricht vom Verbraucher empfangen wird, unabhängig davon, ob Liegt eine Nachricht von der Verbraucherseite vor, werden Nachrichten in der Warteschlange automatisch gelöscht. Um sicherzustellen, dass der Verbraucher die Nachricht erfolgreich konsumieren kann, ändern Sie daher den automatischen Modus in den manuellen Bestätigungsmodus: #🎜🎜##🎜🎜#Ändern Sie die application.yml-Datei #🎜🎜#rrreee#🎜🎜#Nach dem Konsumieren Nachricht, manuelle Bestätigung ist erforderlich: # 🎜🎜#rrreeerrreee#🎜🎜#Wenn Sie nicht hinzufügen: #🎜🎜#rrreee#🎜🎜#Zwei Nachrichten senden#🎜🎜##🎜🎜#Nachdem die Nachricht empfangen wurde, dort Es erfolgt keine Bestätigung und es wird wieder in die Warteschlange gestellt: #🎜🎜 ##🎜🎜##🎜🎜##🎜🎜##🎜🎜# Nach dem Neustart des Projekts werden die Warteschlangennachrichten an den Verbraucher gesendet, jedoch ohne Bestätigung Nach Bestätigung werden sie weiterhin wieder in die Warteschlange gestellt. #🎜🎜#

        Nachdem Sie channel.basicAck hinzugefügt haben, starten Sie das Projekt neuchannel.basicAck 之后,再重启项目

        So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen

        队列消息就被删除了

        basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

        multiple 设置成 true

        „So

        So verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichenDie Warteschlangennachricht wird gelöscht

        🎜basicAck Der letzte Parameter der Methode multiple bedeutet, dass die Warteschlange vor dem Löschen. 🎜🎜multiple wird auf true gesetzt und alle nachfolgenden Warteschlangen werden gelöscht🎜🎜🎜🎜

    Das obige ist der detaillierte Inhalt vonSo verwenden Sie SpringBoot + RabbitMQ, um eine zuverlässige Nachrichtenübertragung zu erreichen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen