ホームページ >Java >&#&チュートリアル >Java コードを使用して RabbitMQ 遅延キューを実装する方法

Java コードを使用して RabbitMQ 遅延キューを実装する方法

PHPz
PHPz転載
2023-05-12 23:55:041004ブラウズ

    RabbitMQ 遅延キューの概要

    RabbitMQ 遅延キューとは、メッセージがキューに送信された後、すぐにはメッセージが消費されないことを意味します。代わりに、コンシューマーによって消費されるまで一定期間待機します。この種のキューは通常、スケジュールされたタスクを実装するために使用されます。たとえば、注文がタイムアウトして支払いが行われない場合、システムは注文をキャンセルし、占有されている在庫を解放します。

    RabbitMQ で遅延キューを実装するにはさまざまな方法がありますが、より一般的なのは、プラグインを使用するか、DLX (Dead Letter Exchange) メカニズムを通じて実装することです。

    プラグインを使用して遅延キューを実装する

    RabbitMQ は、遅延キューを実装するために使用できる Rabbitmq_layed_message_exchange プラグインを提供します。 このプラグインの原理は、メッセージ送信時に特定の Exchange にメッセージを送信し、Exchange がメッセージ内の遅延時間に従って指定されたキューにメッセージを転送することで、この機能を実現します。遅延キューの

    このプラグインを使用するには、まずプラグインをインストールし、次に Exchange を作成し、Exchange のタイプを x-layed-message に設定してから、Exchange をキューにバインドする必要があります。

    DLX メカニズムを使用して遅延キューを実装する

    メッセージの TTL は、メッセージの生存時間です。 RabbitMQ はキューとメッセージにそれぞれ TTL を設定できます。キュー設定は、コンシューマーが接続されていない状態でのキューの保持時間であり、個別のメッセージごとに個別の設定を行うこともできます。 この時間が経過すると、メッセージは無効になったとみなされ、それをデッドレターと呼びます。キューが設定されており、メッセージが設定されている場合は、小さい方の値が使用されます。したがって、メッセージが異なるキューにルーティングされる場合、メッセージの終了時刻は異なる可能性があります (キュー設定が異なる)。ここでは、単一メッセージの TTL についてのみ説明します。これは、遅延したタスクを達成するための鍵となるためです。 メッセージの有効期限フィールドまたは x-message-ttl 属性を設定することで時間を設定できます。両方とも同じ効果があります。

    DLX メカニズムは、RabbitMQ が提供するメッセージ転送メカニズムであり、処理できないメッセージを指定された Exchange に転送することで、メッセージの遅延処理を実現します。具体的な実装手順は次のとおりです。

    • 通常の Exchange と Queue を作成し、それらをバインドします。

    • DLX Exchange を作成し、通常の Exchange を DLX Exchange にバインドします。

    • TTL (Time To Live) 属性を持つようにキューを設定し、メッセージの有効期限を設定します。

    • キューを DLX Exchange にバインドします。

    メッセージの有効期限が切れると、メッセージは DLX Exchange に送信され、DLX Exchange は指定された Exchange にメッセージを転送することで、遅延キュー機能を実現します。

    DLX メカニズムを使用して遅延キューを実装する利点は、追加のプラグインをインストールする必要がないことですが、メッセージの有効期限を正確に制御する必要があります。そうしないと、メッセージの有効期限が長くなる可能性があります。不正確であること。

    Java 言語での遅延キューの設定

    Java 言語を使用して RabbitMQ を通じて遅延キューを設定する手順は次のとおりです:

    プラグインをインストールします

    まず、

    rabbitmq_layed_message_exchange プラグインをインストールする必要があります。次のコマンドを使用してインストールできます。

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    遅延スイッチの作成

    遅延キューでは遅延スイッチを使用する必要があります。遅延スイッチは、

    x-layed-message タイプを使用して作成できます。以下は、遅延スイッチを作成するためのサンプル コードです。

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);

    遅延キューの作成

    遅延キューを作成するときは、キューを遅延スイッチにバインドし、遅延スイッチの TTL を設定する必要があります。キュー (Time To Live) パラメーター。以下は、遅延キューを作成するサンプル コードです。

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "delayed-exchange");
    args.put("x-dead-letter-routing-key", "delayed-queue");
    args.put("x-message-ttl", 5000);
    channel.queueDeclare("delayed-queue", true, false, false, args);
    channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");

    上記のコードでは、キューは遅延スイッチにバインドされており、キューの TTL パラメータは 5000 ミリ秒に設定されています。メッセージは queue に送信されます。5000 ミリ秒以内にコンシューマーによって消費されない場合、メッセージは

    layed-exchange スイッチに転送され、layed-queue キューに送信されます。 。

    遅延メッセージを送信する

    遅延メッセージを送信する場合は、メッセージの有効期限を示すメッセージの

    expiration 属性を設定する必要があります。以下は、遅延メッセージを送信するサンプル コードです。

    Map<String, Object> headers = new HashMap<>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .expiration("5000")
            .build();
    channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());

    上記のコードでは、メッセージの

    expiration プロパティが 5000 ミリ秒に設定され、メッセージは に送信されます。 Delayed-exchange スイッチでは、ルーティング キーは layed-queue で、メッセージの内容は「Hello, Delayed Queue!」です。

    遅延メッセージの使用

    遅延メッセージを使用する場合は、コンシューマの QOS (サービス品質) パラメータを設定して、コンシューマの同時処理能力を制御する必要があります。以下は、遅延メッセージを消費するためのサンプル コードです。

    channel.basicQos(1);
    channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    });

    上記のコードでは、QOS パラメータは 1 に設定されています。つまり、一度に 1 つのメッセージだけが処理されます。次に、

    basicConsume メソッドを使用して layed-queue キュー内のメッセージを消費し、消費が完了したら、basicAck メソッドを使用してメッセージが消費されてしまった。

    上記の手順により、スケジュールされたタスクなどの機能を実装するために使用される RabbitMQ 遅延キューを実装できます。

    RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

    RabbitMQ延时队列具体代码

    下面是具体代码(附注释):

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }

    在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

    注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

    以上がJava コードを使用して RabbitMQ 遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。