ホームページ >Java >&#&チュートリアル >Java コードを使用して RabbitMQ 遅延キューを実装する方法
RabbitMQ 遅延キューとは、メッセージがキューに送信された後、すぐにはメッセージが消費されないことを意味します。代わりに、コンシューマーによって消費されるまで一定期間待機します。この種のキューは通常、スケジュールされたタスクを実装するために使用されます。たとえば、注文がタイムアウトして支払いが行われない場合、システムは注文をキャンセルし、占有されている在庫を解放します。
RabbitMQ で遅延キューを実装するにはさまざまな方法がありますが、より一般的なのは、プラグインを使用するか、DLX (Dead Letter Exchange) メカニズムを通じて実装することです。
RabbitMQ は、遅延キューを実装するために使用できる Rabbitmq_layed_message_exchange プラグインを提供します。 このプラグインの原理は、メッセージ送信時に特定の Exchange にメッセージを送信し、Exchange がメッセージ内の遅延時間に従って指定されたキューにメッセージを転送することで、この機能を実現します。遅延キューの 。
このプラグインを使用するには、まずプラグインをインストールし、次に Exchange を作成し、Exchange のタイプを x-layed-message に設定してから、Exchange をキューにバインドする必要があります。
メッセージの TTL は、メッセージの生存時間です。 RabbitMQ はキューとメッセージにそれぞれ TTL を設定できます。キュー設定は、コンシューマーが接続されていない状態でのキューの保持時間であり、個別のメッセージごとに個別の設定を行うこともできます。 この時間が経過すると、メッセージは無効になったとみなされ、それをデッドレターと呼びます。キューが設定されており、メッセージが設定されている場合は、小さい方の値が使用されます。したがって、メッセージが異なるキューにルーティングされる場合、メッセージの終了時刻は異なる可能性があります (キュー設定が異なる)。ここでは、単一メッセージの TTL についてのみ説明します。これは、遅延したタスクを達成するための鍵となるためです。 メッセージの有効期限フィールドまたは x-message-ttl 属性を設定することで時間を設定できます。両方とも同じ効果があります。
DLX メカニズムは、RabbitMQ が提供するメッセージ転送メカニズムであり、処理できないメッセージを指定された Exchange に転送することで、メッセージの遅延処理を実現します。具体的な実装手順は次のとおりです。
rabbitmq_layed_message_exchange プラグインをインストールする必要があります。次のコマンドを使用してインストールできます。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange遅延スイッチの作成遅延キューでは遅延スイッチを使用する必要があります。遅延スイッチは、
x-layed-message タイプを使用して作成できます。以下は、遅延スイッチを作成するためのサンプル コードです。
Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);遅延キューの作成遅延キューを作成するときは、キューを遅延スイッチにバインドし、遅延スイッチの TTL を設定する必要があります。キュー (Time To Live) パラメーター。以下は、遅延キューを作成するサンプル コードです。
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "delayed-exchange"); args.put("x-dead-letter-routing-key", "delayed-queue"); args.put("x-message-ttl", 5000); channel.queueDeclare("delayed-queue", true, false, false, args); channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");上記のコードでは、キューは遅延スイッチにバインドされており、キューの TTL パラメータは 5000 ミリ秒に設定されています。メッセージは queue に送信されます。5000 ミリ秒以内にコンシューマーによって消費されない場合、メッセージは
layed-exchange スイッチに転送され、
layed-queue キューに送信されます。 。
expiration 属性を設定する必要があります。以下は、遅延メッセージを送信するサンプル コードです。
Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", 5000); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .headers(headers) .expiration("5000") .build(); channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());上記のコードでは、メッセージの
expiration プロパティが 5000 ミリ秒に設定され、メッセージは
に送信されます。 Delayed-exchange スイッチでは、ルーティング キーは
layed-queue で、メッセージの内容は「Hello, Delayed Queue!」です。
channel.basicQos(1); channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received message: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); });上記のコードでは、QOS パラメータは 1 に設定されています。つまり、一度に 1 つのメッセージだけが処理されます。次に、
basicConsume メソッドを使用して
layed-queue キュー内のメッセージを消費し、消費が完了したら、
basicAck メソッドを使用してメッセージが消費されてしまった。
RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。
下面是具体代码(附注释):
import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class DelayedQueueExample { private static final String EXCHANGE_NAME = "delayed_exchange"; private static final String QUEUE_NAME = "delayed_queue"; private static final String ROUTING_KEY = "delayed_routing_key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; */ // 创建一个支持延时队列的Exchange Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments); // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数 Map<String, Object> queueArguments = new HashMap<>(); queueArguments.put("x-dead-letter-exchange", ""); queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME); queueArguments.put("x-message-ttl", 5000); channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送消息到延时队列中,设置expiration参数 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") .build(); String message = "Hello, delayed queue!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes()); System.out.println("Sent message to delayed queue: " + message); channel.close(); connection.close(); } }
在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。
注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。
以上がJava コードを使用して RabbitMQ 遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。