Rumah  >  Artikel  >  Java  >  Analisis contoh ciri lanjutan Java RabbitMQ

Analisis contoh ciri lanjutan Java RabbitMQ

WBOY
WBOYke hadapan
2023-04-29 20:25:05804semak imbas

    Penyampaian mesej yang boleh dipercayai

    Apabila menggunakan RabbitMQ, sebagai penghantar mesej, anda ingin mengelakkan sebarang kehilangan mesej atau senario kegagalan penghantaran. RabbitMQ memberi kami dua cara untuk mengawal mod kebolehpercayaan penghantaran mesej.

    • sahkan mod pengesahan

    • mod pulangan balik

    keseluruhan laluan penghantaran mesej rabbitmq ialah:

    Jika penghantaran mesej daripada pertukaran—>gilir gagal, returnCallback akan dikembalikan

    • Kami boleh menggunakan kedua-dua panggilan balik ini untuk mengawal penghantaran yang boleh dipercayai bagi mesej

    • Mod pengesahan
    • Apabila mesej dihantar daripada pengeluar untuk bertukar, confirmCallback akan dikembalikan

    • Ambil integrasi spring rabbitmq sebagai contoh, ubah suai konfigurasi rabbitmq fail, tambahkan atribut publisher-confirms dalam connectionFactory dan tetapkan nilainya adalah benar
    <!--
    * 确认模式:
    * 步骤:
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
    -->
    <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"/>
    /*
     * 确认模式:
     * 步骤:
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
        public void queueTest(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                    System.out.println("confirm方法被执行了....");
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //路由键与队列同名
            rabbitTemplate.convertAndSend("spring_queue", "message confirm....");
        }

    Oleh kerana mesej dihantar ke baris gilir seperti biasa, nilai punca yang dikembalikan adalah kosong. Jika pengecualian berlaku, sebab ialah sebab pengecualian

    dan Mod dikembalikan

    Jika penghantaran mesej daripada pertukaran–>gilir gagal, panggilan balik akan dikembalikan

    Analisis contoh ciri lanjutan Java RabbitMQ1. Hidupkan mod sandaran: publisher-returns="true"

        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-returns="true"/>

    2 Tetapkan mod untuk kegagalan pemprosesan mesej Exchange: setMandatory, dan kemudian tetapkan ReturnCallBack

        @Test
        public void queueTest(){
            //1.设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
            //2.设置ReturnCallBack
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * @param message    消息对象
                 * @param replyCode  错误码
                 * @param replyText  错误信息
                 * @param exchange   交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String
                        replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    //处理
                }
            });
            //手动添加错误路由模拟错误发生
            rabbitTemplate.convertAndSend("spring_topic_exchange", "return123", "return message...");
        }

    dikembalikan apabila ralat berlaku, jadi tambahkan ralat secara manual dan tambahkan nilai penghalaan return123 pada mesej yang dihantar Sebenarnya, tiada Penghalaan sedemikian, mesej yang dikembalikan dengan berjalan adalah seperti berikut.

    Ack Pengguna

    ack merujuk kepada Akui, pengesahan. Menunjukkan kaedah pengesahan selepas pengguna menerima mesej.

    Terdapat tiga kaedah pengesahan:

    Analisis contoh ciri lanjutan Java RabbitMQ

    Pengesahan automatik: acknowledge="none"

    Pengesahan manual: acknowledge=" manual ”

    • Sahkan mengikut situasi abnormal: acknowledge="auto", (kaedah ini menyusahkan untuk digunakan dan tidak belajar)

    • Antaranya, pengesahan automatik Ia bermakna apabila mesej diterima oleh Pengguna, resit akan disahkan secara automatik dan mesej yang sepadan akan dialih keluar daripada cache mesej RabbitMQ. Walau bagaimanapun, dalam pemprosesan perniagaan sebenar, besar kemungkinan mesej akan hilang jika terdapat pengecualian dalam pemprosesan perniagaan selepas mesej diterima. Jika kaedah pengesahan manual ditetapkan, anda perlu memanggil channel.basicAck() selepas pemprosesan perniagaan berjaya dan log masuk secara manual Jika pengecualian berlaku, hubungi kaedah channel.basicNack() untuk membenarkannya menghantar semula mesej secara automatik.

    • Mari kita ambil integrasi musim bunga rabbitmq sebagai contoh Kaedah pengesahan ditetapkan dalam fail konfigurasi rabbitmq
    • <rabbit:listener-container connection-factory="connectionFactory"
      acknowledge="manual">
      .....

      Kod kelas mendengar adalah seperti berikut:

      public class AckListener implements ChannelAwareMessageListener {
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
              long deliveryTag = message.getMessageProperties().getDeliveryTag();
              try {
                  //1.接收转换消息
                  System.out.println(new String(message.getBody()));
                  //2. 处理业务逻辑
                  System.out.println("处理业务逻辑...");
                  int i = 3/0;//出现错误
                  // 3. 手动签收
                  channel.basicAck(deliveryTag,true);
              } catch (Exception e) {
                  //e.printStackTrace();
                  //4.拒绝签收
                  /*
                   *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                   *重新发送该消息给消费端
                   */
                  channel.basicNack(deliveryTag,true,true);
                  //channel.basicReject(deliveryTag,true);
              }
          }
      }
    • The listening class code. kaedah channel.basicNack() dipanggil kerana pengecualian , biarkan ia menghantar semula mesej secara automatik, jadi kandungan output adalah gelung tak terhingga

    Had semasa sebelah pengguna

    Apabila pelayan When the Rabbitmq kami mempunyai tunggakan berpuluh-puluh ribu mesej yang tidak diproses, jika kami membuka pelanggan pengguna sesuka hati, situasi berikut akan berlaku: sejumlah besar mesej akan ditolak serta-merta, tetapi pelanggan tunggal kami tidak dapat memproses begitu banyak data pada masa yang sama Apabila jumlah data sangat besar, pastinya tidak saintifik bagi kami untuk mengehadkan aliran tamat pengeluaran, kerana kadangkala keselarasan adalah sangat besar! , dan kadangkala konkurensinya sangat kecil, dan kami tidak boleh menyekat akhir pengeluaran Ini adalah tingkah laku pengguna. Oleh itu, kita harus mengehadkan aliran di sisi pengguna rabbitmq menyediakan fungsi qos (kualiti perkhidmatan), iaitu, atas premis pengesahan bukan automatik mesej, jika bilangan mesej tertentu (nilai Qos ditetapkan untuk saluran. atau pengguna) tidak disahkan sebelum ini, jangan gunakan mesej baharu. Analisis contoh ciri lanjutan Java RabbitMQ

    1 Pastikan mekanisme ack adalah pengesahan manual

    2 Atribut konfigurasi pendengar-bekas = 1 bermakna pengguna menarik mesej daripada mq untuk penggunaan setiap kali sehingga pengesahan manual. selesai. Selepas itu, ia akan terus menarik mesej seterusnya.

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
            <rabbit:listener ref="topicListenerACK" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>
    Analisis contoh ciri lanjutan Java RabbitMQPengeluar, hantar lima mesej

        @Test
        public void topicTest(){
    /**
     * 参数1:交换机名称
     * 参数2:路由键名
     * 参数3:发送的消息内容
     */
            for (int i=0;i<5;i++){
                rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.a", "发送到spring_topic_exchange交换机xzk.cn的消息"+i);
            }
        }
    }

    Pengeluar mengulas saluran.basicAck(deliveryTag,true) dan tidak mengesahkan penerimaan mesej

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                // 3. 手动签收
                //channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }

    Mulakan pengguna di kali ini Selepas menjalankan semula pengeluar, didapati pengguna menghantar lima mesej Malah, pengeluar hanya menerima satu mesej, yang mencapai kesan had semasa

    Memerhati. konsol rabbitmq, kami mendapati terdapat 1 mesej yang tidak dibuka. 4 mesej sedia belum sampai kepada pengguna. Ia konsisten dengan situasi mengehadkan semasa prefetchCount=1 yang kami tetapkan.

    Analisis contoh ciri lanjutan Java RabbitMQ

    把channel.basicAck(deliveryTag,true)的注释取消掉,即可以自动确认收到消息,重新运行消费者,接收到了另外的四条消息

    Analisis contoh ciri lanjutan Java RabbitMQ

    Analisis contoh ciri lanjutan Java RabbitMQ

    TTL(Time To Live)

    Time To Live,消息过期时间设置

    设置某个队列为过期队列

    设置交换机,队列以及队列过期时间为10000ms

     <!--ttl-->
        <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:topic-exchange name="test_exchange_ttl">
            <rabbit:bindings>
                <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>

    生产者发送10条消息

        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
            }

    Analisis contoh ciri lanjutan Java RabbitMQ

    十秒钟后,过期消息消失

    Analisis contoh ciri lanjutan Java RabbitMQ

    设置单独某个消息过期

    设置交换机和队列

    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"/>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>     
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生产者发送特定过期消息,用到了MessagePostProcessor这个api

     @Test
        public void testTtl() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //1.设置message信息
                    message.getMessageProperties().setExpiration("5000");//消息的过期时间
                    //2.返回该消息
                    return message;
                }
            };
            //消息单独过期
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
        }

    Analisis contoh ciri lanjutan Java RabbitMQ

    5s之后

    Analisis contoh ciri lanjutan Java RabbitMQ

    注:

    1.如果同时设置队列过期和消息过期,系统会根据哪个过期的时间短而选用哪儿个。

    2.设置单独消息过期时,如果该消息不为第一个接受的消息,则不过期。

    死信队列

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Deadmessage后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    Analisis contoh ciri lanjutan Java RabbitMQ

    消息成为死信的三种情况:

    • 队列消息长度到达限制;

    • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    • 原队列存在消息过期设置,消息到达超时时间未被消费;

    队列绑定死信交换机:

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

    Analisis contoh ciri lanjutan Java RabbitMQ

    实现

    1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    3.生产端测试

    /**
    * 发送测试死信消息:
    * 1. 过期时间
    * 2. 长度限制
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        //2. 测试长度限制后,消息死信
        /* for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        }*/
        //3. 测试消息拒收
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    }

    4.消费端监听

    public class DlxListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">
    </rabbit:listener>

    延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。c

    需求:

    1.下单后,30分钟未支付,取消订单,回滚库存。

    2.新用户注册成功7天后,发送短信问候。

    实现方式:

    • 定时器

    • 延迟队列

    定时器的实现方式不够优雅,我们采取延迟队列的方式

    Analisis contoh ciri lanjutan Java RabbitMQ

    不过很可惜,在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    Analisis contoh ciri lanjutan Java RabbitMQ

    配置

    <!--
    延迟队列:
            1. 定义正常交换机(order_exchange)和队列(order_queue)
            2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3. 绑定,设置正常队列过期时间为30分钟
    -->
    <!-- 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!-- 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生产端测试

    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
        /*//2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }*/
    }

    消费端监听

    public class OrderListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		try {
    			//1.接收转换消息
    			System.out.println(new String(message.getBody()));
    			//2. 处理业务逻辑
    			System.out.println("处理业务逻辑...");
    			System.out.println("根据订单id查询其状态...");
    			System.out.println("判断状态是否为支付成功");
    			System.out.println("取消订单,回滚库存....");
    			//3. 手动签收
    			channel.basicAck(deliveryTag,true);
    		} catch (Exception e) {
    			//e.printStackTrace();
    			System.out.println("出现异常,拒绝接受");
    			//4.拒绝签收,不重回队列 requeue=false
    			channel.basicNack(deliveryTag,true,false);
    		}
    	}
    }
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">
    </rabbit:listener>

    Atas ialah kandungan terperinci Analisis contoh ciri lanjutan Java RabbitMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam