RabbitMQ-Verzögerungswarteschlange bedeutet, dass eine Nachricht, nachdem sie an die Warteschlange gesendet wurde, nicht sofort vom Verbraucher konsumiert wird, sondern eine gewisse Zeit wartet, bevor sie vom Verbraucher konsumiert wird . Diese Art von Warteschlange wird normalerweise verwendet, um geplante Aufgaben auszuführen. Wenn beispielsweise eine Bestellung abläuft und nicht bezahlt wird, storniert das System die Bestellung und gibt den belegten Bestand frei.
Es gibt viele Möglichkeiten, Verzögerungswarteschlangen in RabbitMQ zu implementieren. Die gebräuchlichste davon ist die Verwendung von Plug-Ins oder die Implementierung über den DLX-Mechanismus (Dead Letter Exchange).
RabbitMQ stellt das Rabbitmq_delayed_message_exchange-Plug-In bereit, mit dem verzögerte Warteschlangen implementiert werden können. Das Prinzip dieses Plug-Ins besteht darin, die Nachricht beim Senden der Nachricht an eine bestimmte Börse zu senden. Anschließend leitet die Börse die Nachricht entsprechend der Verzögerungszeit in der Nachricht an die angegebene Warteschlange weiter und realisiert so die Funktion von Verzögerungswarteschlange.
Um dieses Plug-in zu verwenden, müssen Sie zuerst das Plug-in installieren, dann einen Exchange erstellen, den Typ des Exchange auf x-delayed-message festlegen und dann den Exchange an die Warteschlange binden.
Die TTL einer Nachricht ist die Überlebenszeit der Nachricht. RabbitMQ kann TTL für Warteschlangen bzw. Nachrichten festlegen. Die Warteschlangeneinstellung ist die Aufbewahrungszeit der Warteschlange ohne angeschlossene Verbraucher. Sie können auch separate Einstellungen für jede einzelne Nachricht festlegen. Nach dieser Zeit betrachten wir die Nachricht als tot und nennen sie einen toten Brief. Wenn die Warteschlange festgelegt ist und die Nachricht festgelegt ist, wird der kleinere Wert verwendet. Wenn eine Nachricht daher an verschiedene Warteschlangen weitergeleitet wird, kann die Todeszeit der Nachricht unterschiedlich sein (unterschiedliche Warteschlangeneinstellungen). Hier sprechen wir nur über die TTL einer einzelnen Nachricht, da sie der Schlüssel zum Erreichen verzögerter Aufgaben ist. Sie können die Zeit festlegen, indem Sie das Ablauffeld der Nachricht oder das Attribut x-message-ttl festlegen. Beide haben den gleichen Effekt.
DLX-Mechanismus ist ein von RabbitMQ bereitgestellter Nachrichtenweiterleitungsmechanismus. Er kann Nachrichten, die nicht verarbeitet werden können, an die angegebene Börse weiterleiten und so eine verzögerte Verarbeitung von Nachrichten erreichen. Die spezifischen Implementierungsschritte lauten wie folgt:
rabbitmq_delayed_message_exchange
installieren > Plug-in. Es kann über den folgenden Befehl installiert werden: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq_delayed_message_exchange
插件。可以通过以下命令安装:Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
延时队列需要使用延时交换机。可以使用 x-delayed-message
类型创建一个延时交换机。以下是创建延时交换机的示例代码:
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "delayed-exchange"); args.put("x-dead-letter-routing-key", "delayed-queue"); args.put("x-message-ttl", 5000); channel.queueDeclare("delayed-queue", true, false, false, args); channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");
创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:
Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", 5000); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .headers(headers) .expiration("5000") .build(); channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());
在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange
交换机上,并发送到 delayed-queue
队列中。
发送延时消息时,需要设置消息的 expiration
属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:
channel.basicQos(1); channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received message: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); });
在上述代码中,设置了消息的 expiration
属性为 5000 毫秒,并将消息发送到 delayed-exchange
交换机上,路由键为 delayed-queue
,消息内容为 “Hello, delayed queue!”。
消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class DelayedQueueExample { private static final String EXCHANGE_NAME = "delayed_exchange"; private static final String QUEUE_NAME = "delayed_queue"; private static final String ROUTING_KEY = "delayed_routing_key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; */ // 创建一个支持延时队列的Exchange Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments); // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数 Map<String, Object> queueArguments = new HashMap<>(); queueArguments.put("x-dead-letter-exchange", ""); queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME); queueArguments.put("x-message-ttl", 5000); channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送消息到延时队列中,设置expiration参数 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") .build(); String message = "Hello, delayed queue!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes()); System.out.println("Sent message to delayed queue: " + message); channel.close(); connection.close(); } }
在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume
方法消费 delayed-queue
队列中的消息,并在消费完成后,使用 basicAck
Verzögerungsschalter erstellen
Verzögerungswarteschlangen erfordern die Verwendung eines Verzögerungsschalters. Mit dem Typ x-delayed-message
kann ein verzögerter Switch erstellt werden. Das Folgende ist ein Beispielcode zum Erstellen eines Verzögerungsschalters:
delayed-exchange
-Switch weitergeleitet und an die delayed-queue
gesendet > Warteschlange. 🎜🎜Eine verzögerte Nachricht senden🎜🎜Wenn Sie eine verzögerte Nachricht senden, müssen Sie das Attribut expiration
der Nachricht festlegen, das die Ablaufzeit der Nachricht angibt. Das Folgende ist ein Beispielcode zum Senden einer verzögerten Nachricht: 🎜rrreee🎜Im obigen Code wird die Eigenschaft expiration
der Nachricht auf 5000 Millisekunden gesetzt und die Nachricht wird an delayed-exchange gesendet
Auf dem Switch ist der Routing-Schlüssel delayed-queue
und der Nachrichteninhalt ist „Hallo, verzögerte Warteschlange!“. 🎜🎜Verzögerte Nachrichten konsumieren🎜🎜Beim Konsumieren verzögerter Nachrichten müssen Sie die QOS-Parameter (Quality of Service) des Verbrauchers festlegen, um die gleichzeitigen Verarbeitungsfähigkeiten des Verbrauchers zu steuern. Das Folgende ist ein Beispielcode für die Verarbeitung verzögerter Nachrichten: 🎜rrreee🎜Im obigen Code ist der QOS-Parameter auf 1 gesetzt, d. h. es wird jeweils nur eine Nachricht verarbeitet. Verwenden Sie dann die Methode basicConsume
, um die Nachricht in der Warteschlange delayed-queue
zu konsumieren, und verwenden Sie nach Abschluss des Konsums die Methode basicAck
zur Bestätigung dass die Nachricht verbraucht wurde. 🎜🎜Durch die oben genannten Schritte können Sie die RabbitMQ-Verzögerungswarteschlange implementieren, die zum Implementieren von Funktionen wie geplanten Aufgaben verwendet wird. 🎜RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。
下面是具体代码(附注释):
import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class DelayedQueueExample { private static final String EXCHANGE_NAME = "delayed_exchange"; private static final String QUEUE_NAME = "delayed_queue"; private static final String ROUTING_KEY = "delayed_routing_key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; */ // 创建一个支持延时队列的Exchange Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments); // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数 Map<String, Object> queueArguments = new HashMap<>(); queueArguments.put("x-dead-letter-exchange", ""); queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME); queueArguments.put("x-message-ttl", 5000); channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送消息到延时队列中,设置expiration参数 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") .build(); String message = "Hello, delayed queue!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes()); System.out.println("Sent message to delayed queue: " + message); channel.close(); connection.close(); } }
在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。
注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。
Das obige ist der detaillierte Inhalt vonSo verwenden Sie Java-Code zum Implementieren der RabbitMQ-Verzögerungswarteschlange. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!