Heim >Java >javaLernprogramm >Wie SpringBoot RabbitMQ integriert, um Warteschlangen für unzustellbare Nachrichten und Verzögerungswarteschlangen zu verarbeiten
RabbitMQ-Nachrichteneinführung
RabbitMQ-Nachrichten haben standardmäßig keine Zeitüberschreitung.
Was ist eine Warteschlange für unzustellbare Nachrichten? Was ist eine Verzögerungswarteschlange?
Dead-Letter-Queue:
DLX, der vollständige Name ist Dead-Letter-Exchange, es kann als Dead-Letter-Exchange bezeichnet werden, und manche Leute nennen es auch ein Briefkasten für tote Briefe. Wenn eine Nachricht in einer Warteschlange zu einem toten Brief wird, kann sie erneut an eine andere Vermittlungsstelle gesendet werden. Die an DLX gebundene Warteschlange wird als tote Briefwarteschlange bezeichnet.
Die folgenden Situationen führen dazu, dass die Nachricht zu einem unzustellbaren Brief wird:
Die Nachricht wird abgelehnt (Basic.Reject/Basic.Nack) , und Setzen Sie das Requeue-Argument auf false;
Verzögerungswarteschlange:
Detaillierte Erläuterung der Verwendung von Warteschlangen für unzustellbare Nachrichten und Verzögerungswarteschlangen in RabbitMQ
Beispielcode #🎜 🎜#
Routing-Konfigurationpackage com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRouterConfig { public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome"; public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute"; public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay"; public static final String ROUTINGKEY_HELLOS = "hello.#"; public static final String ROUTINGKEY_DELAY = "delay.#"; public static final String QUEUE_HELLO = "Queue@hello"; public static final String QUEUE_HI = "Queue@hi"; public static final String QUEUE_UNROUTE = "Queue@unroute"; public static final String QUEUE_DELAY = "Queue@delay"; public static final Integer TTL_QUEUE_MESSAGE = 5000; @Autowired AmqpAdmin amqpAdmin; @Bean Object initBindingTest() { amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME) .durable(true) .autoDelete() .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO) .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY) .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY) .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build()); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null)); return new Object(); } }Controller
package com.example.controller; import com.example.config.RabbitRouterConfig; import com.example.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController public class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); } }Sender
package com.example.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); } }Empfänger
package com.example.mq; import com.example.config.RabbitRouterConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void hello(String hello) throws InterruptedException { // System.out.println ("Receiver(hello) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(hello):sleep over"); // } // // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) // public void unroute(String hello) throws InterruptedException { // System.out.println ("Receiver(unroute) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(unroute):sleep over"); // } @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY) public void delay(String hello) throws InterruptedException { System.out.println ("Receiver(delay) : " + hello); Thread.sleep(5 * 1000); System.out.println("(delay):sleep over"); } }# 🎜🎜#Anwendung .yml
server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualHost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual
Das obige ist der detaillierte Inhalt vonWie SpringBoot RabbitMQ integriert, um Warteschlangen für unzustellbare Nachrichten und Verzögerungswarteschlangen zu verarbeiten. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!