RabbitMQ 메시지 소개
RabbitMQ 메시지는 기본적으로 시간 초과되지 않습니다.
배달 못한 편지 대기열이란 무엇인가요? 지연 대기열이란 무엇입니까?
데드 레터 큐:
DLX, 전체 이름은 Dead-Letter-Exchange이며 데드 레터 교환기라고 할 수 있으며 어떤 사람들은 데드 레터 메일박스라고 부릅니다. 메시지가 대기열에서 배달 못한 편지가 되면 다른 교환기로 재전송될 수 있습니다. 이 교환을 DLX에 바인딩된 대기열을 배달 못한 편지 대기열이라고 합니다.
다음 상황에서는 메시지가 배달 못한 편지가 됩니다.
메시지가 거부되고(Basic.Reject/Basic.Nack) requeue 매개변수가 false로 설정됩니다.
메시지가 만료됩니다.
큐가 최대 길이에 도달했습니다.
지연 대기열:
지연 대기열은 지연된 메시지를 저장하는 데 사용됩니다. 지연된 메시지: 메시지가 전송된 후 소비자는 소비자가 메시지를 즉시 받는 것을 원하지 않지만 소비자가 소비할 메시지를 받을 수 있을 때까지 특정 기간 동안 기다립니다.
관련 URL
RabbitMQ의 배달 못한 편지 대기열 및 지연 대기열 사용에 대한 자세한 설명
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRouterConfig { public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome"; public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute"; public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay"; public static final String ROUTINGKEY_HELLOS = "hello.#"; public static final String ROUTINGKEY_DELAY = "delay.#"; public static final String QUEUE_HELLO = "Queue@hello"; public static final String QUEUE_HI = "Queue@hi"; public static final String QUEUE_UNROUTE = "Queue@unroute"; public static final String QUEUE_DELAY = "Queue@delay"; public static final Integer TTL_QUEUE_MESSAGE = 5000; @Autowired AmqpAdmin amqpAdmin; @Bean Object initBindingTest() { amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME) .durable(true) .autoDelete() .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO) .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY) .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY) .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build()); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null)); return new Object(); } }
package com.example.controller; import com.example.config.RabbitRouterConfig; import com.example.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController public class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); } }
package com.example.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
package com.example.mq; import com.example.config.RabbitRouterConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void hello(String hello) throws InterruptedException { // System.out.println ("Receiver(hello) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(hello):sleep over"); // } // // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) // public void unroute(String hello) throws InterruptedException { // System.out.println ("Receiver(unroute) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(unroute):sleep over"); // } @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY) public void delay(String hello) throws InterruptedException { System.out.println ("Receiver(delay) : " + hello); Thread.sleep(5 * 1000); System.out.println("(delay):sleep over"); } }
server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualHost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual
발신자와 수신자를 각각 시작합니다.
방문: http://localhost:9100/hello2
5초 후 출력:
Receiver(delay): hello2 message:2020-11-27T09:30:51.548
(delay): sleep over
위 내용은 SpringBoot가 RabbitMQ를 통합하여 배달 못한 편지 대기열과 지연 대기열을 처리하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!