Rumah  >  Artikel  >  Java  >  Cara menggunakan kod Java untuk melaksanakan baris gilir kelewatan RabbitMQ

Cara menggunakan kod Java untuk melaksanakan baris gilir kelewatan RabbitMQ

PHPz
PHPzke hadapan
2023-05-12 23:55:04965semak imbas

    Pengenalan kepada Gilir Kelewatan RabbitMQ

    Baris Gilir Kelewatan RabbitMQ bermaksud bahawa selepas mesej dihantar ke baris gilir, ia tidak segera digunakan oleh pengguna. Sebaliknya, ia menunggu dalam tempoh masa sebelum dimakan oleh pengguna. Baris gilir jenis ini biasanya digunakan untuk melaksanakan tugas berjadual Contohnya, jika pesanan tamat dan tidak dibayar, sistem membatalkan pesanan dan mengeluarkan inventori yang diduduki.

    Terdapat banyak cara untuk melaksanakan baris gilir kelewatan dalam RabbitMQ, yang lebih biasa ialah menggunakan pemalam atau melaksanakannya melalui mekanisme DLX (Dead Letter Exchange).

    Gunakan pemalam untuk melaksanakan baris gilir tertunda

    RabbitMQ menyediakan pemalam rabbitmq_delayed_message_exchange, yang boleh digunakan untuk melaksanakan baris gilir tertunda. Prinsip pemalam ini adalah untuk menghantar mesej ke Exchange tertentu apabila mesej dihantar, dan kemudian Exchange akan memajukan mesej ke baris gilir yang ditentukan mengikut masa kelewatan dalam mesej, dengan itu merealisasikan fungsi daripada baris gilir kelewatan .

    Untuk menggunakan pemalam ini, anda perlu memasang pemalam terlebih dahulu, kemudian mencipta Exchange, tetapkan jenis Exchange kepada x-delayed-message, dan kemudian mengikat Exchange ke baris gilir.

    Gunakan mekanisme DLX untuk melaksanakan baris gilir kelewatan

    TTL mesej ialah masa kemandirian mesej. RabbitMQ boleh menetapkan TTL untuk baris gilir dan mesej masing-masing. Tetapan baris gilir ialah masa pengekalan baris gilir tanpa pengguna disambungkan, dan anda juga boleh menetapkan tetapan berasingan untuk setiap mesej individu. Selepas masa ini, kami menganggap mesej itu mati dan memanggilnya surat mati. Jika baris gilir ditetapkan dan mesej ditetapkan, nilai yang lebih kecil akan digunakan. Oleh itu, jika mesej dihalakan ke baris gilir yang berbeza, masa kematian mesej mungkin berbeza (tetapan baris gilir yang berbeza). Di sini kita hanya bercakap tentang TTL bagi satu mesej, kerana ia adalah kunci untuk mencapai tugas yang tertangguh. Anda boleh menetapkan masa dengan menetapkan medan tamat tempoh mesej atau atribut x-message-ttl Kedua-duanya mempunyai kesan yang sama.

    Mekanisme DLX ialah mekanisme pemajuan mesej yang disediakan oleh RabbitMQ Ia boleh memajukan mesej yang tidak boleh diproses ke Exchange yang ditetapkan, dengan itu mencapai pemprosesan mesej yang tertunda . Langkah-langkah pelaksanaan khusus adalah seperti berikut:

    • Buat Pertukaran dan Baris Gilir yang sama dan ikatkan keduanya.

    • Buat DLX Exchange dan ikat Exchange biasa ke DLX Exchange.

    • Tetapkan Baris Gilir untuk mempunyai atribut TTL (Time To Live) dan tetapkan masa tamat tempoh mesej.

    • Ikat Baris Gilir ke DLX Exchange.

    Apabila mesej tamat tempoh, ia akan dihantar ke DLX Exchange, dan kemudian DLX Exchange akan memajukan mesej ke Exchange yang ditetapkan, dengan itu merealisasikan fungsi baris gilir kelewatan.

    Kelebihan menggunakan mekanisme DLX untuk melaksanakan baris gilir kelewatan ialah tidak perlu memasang pemalam tambahan, tetapi masa tamat tempoh mesej perlu dikawal dengan tepat, jika tidak, masa tamat tempoh mesej mungkin tidak tepat.

    Menetapkan baris gilir kelewatan dalam bahasa Java

    Berikut ialah langkah untuk menyediakan baris gilir kelewatan melalui RabbitMQ menggunakan bahasa Java:

    Pasang pemalam

    Pertama, anda perlu memasang rabbitmq_delayed_message_exchange Pemalam. Ia boleh dipasang melalui arahan berikut:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    Mencipta suis lengah

    Gilir kelewatan memerlukan penggunaan suis tunda. Suis kelewatan boleh dibuat menggunakan jenis x-delayed-message. Berikut ialah kod sampel untuk mencipta suis kelewatan:

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);

    Membuat baris gilir kelewatan

    Apabila membuat baris gilir kelewatan, anda perlu mengikat baris gilir ke suis kelewatan dan menetapkan TTL bagi parameter baris gilir ( Masa Untuk Hidup). Berikut ialah kod sampel untuk membuat baris gilir kelewatan:

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "delayed-exchange");
    args.put("x-dead-letter-routing-key", "delayed-queue");
    args.put("x-message-ttl", 5000);
    channel.queueDeclare("delayed-queue", true, false, false, args);
    channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");

    Dalam kod di atas, baris gilir terikat pada suis kelewatan dan parameter TTL baris gilir ditetapkan kepada 5000 milisaat, iaitu selepas mesej dihantar ke baris gilir , jika ia tidak digunakan oleh pengguna dalam masa 5000 milisaat, ia akan dimajukan ke suis delayed-exchange dan dihantar ke baris gilir delayed-queue.

    Hantar mesej tertunda

    Apabila menghantar mesej tertunda, anda perlu menetapkan atribut expiration mesej, yang menunjukkan masa tamat tempoh mesej. Berikut ialah contoh kod untuk menghantar mesej tertunda:

    Map<String, Object> headers = new HashMap<>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .expiration("5000")
            .build();
    channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());

    Dalam kod di atas, atribut expiration mesej ditetapkan kepada 5000 milisaat dan mesej dihantar ke suis delayed-exchange dengan kunci penghalaan delayed-queue, kandungan mesej ialah "Helo, gilir tertunda!".

    Menggunakan mesej tertunda

    Apabila menggunakan mesej tertunda, anda perlu menetapkan parameter QOS (Kualiti Perkhidmatan) pengguna untuk mengawal keupayaan pemprosesan serentak pengguna. Berikut ialah contoh kod untuk menggunakan mesej tertunda:

    channel.basicQos(1);
    channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    });

    Dalam kod di atas, parameter QOS ditetapkan kepada 1, iaitu, hanya satu mesej diproses pada satu masa. Kemudian gunakan kaedah basicConsume untuk menggunakan mesej dalam baris gilir delayed-queue, dan selepas penggunaan selesai, gunakan kaedah basicAck untuk mengesahkan bahawa mesej telah digunakan.

    Melalui langkah di atas, anda boleh melaksanakan baris gilir kelewatan RabbitMQ, yang digunakan untuk melaksanakan fungsi seperti tugas yang dijadualkan.

    RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

    RabbitMQ延时队列具体代码

    下面是具体代码(附注释):

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }

    在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

    注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

    Atas ialah kandungan terperinci Cara menggunakan kod Java untuk melaksanakan baris gilir kelewatan RabbitMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam