Rumah  >  Artikel  >  Java  >  Bagaimana SpringBoot menyepadukan RabbitMQ untuk mengendalikan baris gilir huruf mati dan menunda baris gilir

Bagaimana SpringBoot menyepadukan RabbitMQ untuk mengendalikan baris gilir huruf mati dan menunda baris gilir

王林
王林ke hadapan
2023-05-15 15:28:06854semak imbas

Pengenalan

Pengenalan mesej RabbitMQ

Mesej RabbitMQ tidak akan tamat masa secara lalai.

Apakah barisan huruf mati? Apakah itu giliran kelewatan?

Baris gilir surat mati:

DLX, nama penuhnya ialah Dead-Letter-Exchange, ia boleh dipanggil penukar surat mati, dan sesetengah orang memanggilnya peti surat surat mati. Apabila mesej menjadi huruf mati dalam baris gilir, ia boleh dihantar semula ke pertukaran lain.

Situasi berikut akan menyebabkan mesej menjadi huruf mati:

  • Mesej ditolak (Basic.Reject/Basic.Nack), dan parameter requeue ialah ditetapkan kepada palsu;

  • Mesej tamat tempoh;

  • Baris gilir kelewatan:

  • Baris gilir kelewatan digunakan untuk menyimpan mesej tertunda. Mesej tertunda: Selepas mesej dihantar, pengguna tidak mahu pengguna mendapatkan mesej dengan segera, tetapi menunggu untuk tempoh masa tertentu sebelum pengguna boleh mendapatkan mesej untuk penggunaan.

URL Berkaitan

Penjelasan terperinci tentang penggunaan baris gilir huruf mati dan baris gilir kelewatan dalam RabbitMQKod contoh

Konfigurasi penghalaan

package com.example.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitRouterConfig {
    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";
    public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute";
    public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay";
 
    public static final String ROUTINGKEY_HELLOS        = "hello.#";
    public static final String ROUTINGKEY_DELAY         = "delay.#";
 
    public static final String QUEUE_HELLO              = "Queue@hello";
    public static final String QUEUE_HI                 = "Queue@hi";
    public static final String QUEUE_UNROUTE            = "Queue@unroute";
    public static final String QUEUE_DELAY              = "Queue@delay";
 
    public static final Integer TTL_QUEUE_MESSAGE       = 5000;
 
    @Autowired
    AmqpAdmin amqpAdmin;
 
    @Bean
    Object initBindingTest() {
        amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)
                .durable(true)
                .autoDelete()
                .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)
 
                .build());
 
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)
                .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)
                .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)
                .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)
                .build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());
 
        amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
        amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
                EXCHANGE_FANOUT_UNROUTE, "", null));
        amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));
 
        return new Object();
    }
}

Pengawal

package com.example.controller;
 
import com.example.config.RabbitRouterConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.time.LocalDateTime;
 
@RestController
public class HelloController {
    @Autowired
    private Sender sender;
 
    @PostMapping("/hi")
    public void hi() {
        sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello1")
    public void hello1() {
        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello2")
    public void hello2() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/ae")
    public void aeTest() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
    }
}

Penghantar

package com.example.mq;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send(String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(routingKey, message);
    }
 
    public void send(String exchange, String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

Penerima

package com.example.mq;
 
import com.example.config.RabbitRouterConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class Receiver {
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
    }
 
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
    // public void hello(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(hello) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(hello):sleep over");
    // }
    //
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
    // public void unroute(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(unroute) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(unroute):sleep over");
    // }
 
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)
    public void delay(String hello) throws InterruptedException {
        System.out.println ("Receiver(delay) : "  + hello);
        Thread.sleep(5 * 1000);
        System.out.println("(delay):sleep over");
    }
}

application.yml

server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /
    publisher-confirms: true
    publisher-returns: true
#    listener:
#      simple:
#        acknowledge-mode: manual
#      direct:
#        acknowledge-mode: manual

Ujian Instance

Ujian Instance

penghantar dan penerima secara berasingan.

Akses: http://localhost:9100/hello2

Output selepas lima saat:

Penerima(kelewatan): mesej hello2:2020-11- 27T09 :30:51.548

(kelewatan):tidur lewat

Atas ialah kandungan terperinci Bagaimana SpringBoot menyepadukan RabbitMQ untuk mengendalikan baris gilir huruf mati dan menunda baris gilir. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam