Rumah  >  Artikel  >  Java  >  Kaedah aplikasi lanjutan RabbitMQ dalam java

Kaedah aplikasi lanjutan RabbitMQ dalam java

王林
王林ke hadapan
2023-04-30 10:40:06757semak imbas

1. Penghantaran mesej yang boleh dipercayai

  Apabila menggunakan RabbitMQ, jika pengeluar ingin mengetahui sama ada mesej itu berjaya dihantar ke suis dan baris gilir yang sepadan semasa menyampaikan mesej, terdapat Dua kaedah boleh digunakan untuk mengawal mod kebolehpercayaan penghantaran mesej.

Kaedah aplikasi lanjutan RabbitMQ dalam java

  Berdasarkan keseluruhan proses penghantaran mesej dalam rajah di atas, mesej pengeluar yang memasuki perisian tengah akan mula-mula sampai ke suis, dan kemudian dihantar dari suis ke beratur Untuk pergi, iaitu, untuk menggunakan strategi dua langkah. Kemudian kehilangan mesej akan berlaku dalam kedua-dua peringkat ini RabbitMQ dengan teliti memberikan kami mod penghantaran baharu yang boleh dipercayai untuk dua bahagian ini:

  • mod pengesahan .

  • mod kembali .

 Gunakan dua mod panggil balik ini untuk memastikan penghantaran mesej yang boleh dipercayai.

1.1. Mod Pengesahan

  Panggilan balik confirmCallback akan dikembalikan apabila mesej dihantar daripada pengeluar ke suis. Logik pengesahan boleh ditetapkan terus dalam contoh rabbitTemplate. Jika anda menggunakan XML konfigurasi, anda perlu mendayakan publisher-confirms="true" dalam konfigurasi kilang Konfigurasi YAML secara langsung publisher-confirm-type: correlated, yang lalai Ya dan perlu dihidupkan secara manual. NONE

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println();
                if (!b) {
                    //	消息重发之类的处理
                    System.out.println(s);
                } else {
                    System.out.println("交换机成功接收消息");
                }
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

 Pengesahan di atas dilaksanakan oleh fungsi

, yang membawa tiga parameter yang pertama ialah maklumat berkaitan konfigurasi, dan yang kedua menunjukkan sama ada suis berjaya menerima mesej parameter ketiga merujuk kepada sebab mengapa mesej tidak berjaya diterima. confirm

1.2. Mod Sandarkan

  Dayakan mod sandaran dalam konfigurasi kilang

publisher-returns="true"returnCallback , tetapkan mod di mana suis gagal memproses mesej (lalai adalah palsu dan mesej dibuang terus) dan tambah logik pemprosesan sandaran .

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  重发逻辑处理
                System.out.println(message.getBody() + " 投递消息队列失败");
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

membawa lima parameter, yang masing-masing merujuk kepada objek mesej, kod ralat, mesej ralat, suis dan kekunci penghalaan.

returnedMessage 1.3. Mekanisme pengesahan

  tetapi tidak dimakan Mesej hilang kerana kejayaan. Terdapat tiga kaedah pengesahan:

  • Pengesahan automatik

    : acknowledge="none"

  • Pengesahan manual

    : acknowledge="manual"

  • Sahkan mengikut keadaan luar biasa

    : acknowledge="auto"

  •   mesej Jika ia ditangkap oleh pengguna, ia secara automatik akan mencapai kejayaan, dan mesej akan dialih keluar daripada baris gilir mesej Jika terdapat masalah dengan penggunaan pengguna pada masa ini, maka penggunaan mesej lalai akan berjaya, tetapi sebenarnya penggunaan tidak akan berjaya, iaitu Mesej semasa hilang. Lalai ialah mekanisme pengesahan automatik.

 Jika anda menetapkan kaedah pengesahan manual, anda perlu melakukan pengesahan panggil balik

selepas penggunaan biasa mesej dan menandatanganinya secara manual. Jika pengecualian berlaku semasa pemprosesan perniagaan, hubungi

untuk menghantar semula mesej. channel.basicAck()channel.basicNack() Pertama, anda perlu mengkonfigurasi mekanisme pengesahan apabila mengikat baris gilir dan menetapkannya kepada tandatangan manual.

<!-- 绑定队列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

  Tidak perlu menukar bahagian pengeluar Anda hanya perlu menukar pelaksanaan pengguna untuk menandatangani mesej secara automatik ralat berlaku dalam perniagaan, mesej akan ditolak.

public class ConsumerAck implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //  消息唯一ID
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = new String(message.getBody(), "utf-8");
            channel.basicAck(tag, true);
            System.out.println("接收消息: " + msg);
        } catch (Exception e) {
            System.out.println("接收消息异常");
            channel.basicNack(tag, true, true);
            e.printStackTrace();
        }
    }
}

 Ia melibatkan tiga fungsi tandatangan mudah, satu untuk tandatangan yang betul

, yang kedua adalah untuk penolakan tunggal

, dan yang ketiga adalah untuk penolakan kelompok basicAck . basicRejectbasicNack

  • basicAck

    Parameter pertama menunjukkan ID unik mesej dalam saluran, hanya untuk Saluran semasa, parameter kedua menunjukkan sama ada bersetuju dalam kelompok, jika ia adalah palsu Jika ia benar, ia hanya akan bersetuju untuk menandatangani mesej dengan ID semasa dan memadamkannya daripada baris gilir mesej Jika ia benar, ia juga akan menandatangani untuk mesej sebelum ID ini.

  • basicReject

    Parameter pertama masih menunjukkan ID unik mesej, parameter kedua menunjukkan sama ada untuk membuat giliran dan menghantar, palsu menunjukkan bahawa mesej dibuang terus atau terdapat Baris huruf mati boleh diterima Jika benar, ini bermakna mesej dihantar semula ke baris gilir Semua operasi hanya untuk mesej semasa.

  • basicNack

    mempunyai satu lagi parameter daripada yang kedua, iaitu nilai Boolean di tengah, yang menunjukkan sama ada untuk membatch.

2、消费端限流

 在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。

Kaedah aplikasi lanjutan RabbitMQ dalam java

 只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。

<!-- 绑定队列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
                           acknowledge="manual" prefetch="1">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

3、消息过期时间

 消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。

 队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。

 如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	设置 message 的过期时间
        message.getMessageProperties().setExpiration("5000");
        //	返回该消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。

4、死信队列

 4.1、死信概念

死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。

Kaedah aplikasi lanjutan RabbitMQ dalam java

  • 消息成为死信消息有几种情况

    队列的消息长度达到限制

    消费者拒接消息的时候不把消息重新放入队列中

    队列存在消息过期设置,消息超时未被消费

    消息存在过期时间,在投递给消费者时发现过期

 在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!-- 死信交换机 -->
        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
        <!-- 路由 -->
        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
        <!-- 队列过期时间 -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <!-- 队列长度 -->
        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 4.2、延迟队列

 延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。

 消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL + 死信队列 的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。

 再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。

 例如实现一个下单超时支付取消订单的功能:

Kaedah aplikasi lanjutan RabbitMQ dalam java

Atas ialah kandungan terperinci Kaedah aplikasi lanjutan RabbitMQ dalam java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam