Rumah  >  Artikel  >  Java  >  Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ

Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ

王林
王林ke hadapan
2023-04-25 16:19:081275semak imbas

    1. Kegigihan

    Apabila perkhidmatan RabbitMQ dihentikan, mesej yang dihantar oleh pengeluar mesej tidak akan hilang. Secara lalai, baris gilir dan mesej diabaikan apabila RabbitMQ keluar atau ranap. Untuk memastikan mesej tidak hilang, kedua-dua baris gilir dan mesej perlu ditandakan sebagai berterusan.

    1.1 Laksanakan kegigihan

    1 Kegigihan baris gilir: Tukar parameter kedua channel.queueDeclare(); kepada benar apabila membuat baris gilir.

    2. Ketekunan mesej: Apabila menggunakan saluran untuk menghantar mesej, channel.basicPublish(); tukar parameter ketiga kepada: MessageProperties.PERSISTENT_TEXT_PLAIN untuk menunjukkan kegigihan mesej.

    /**
     * @Description 持久化MQ
     * @date 2022/3/7 9:14
     */
    public class Producer3 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 持久化队列
            channel.queueDeclare(LONG_QUEUE,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            int i = 0;
            while (scanner.hasNext()){
                i++;
                String msg = scanner.next() + i;
                // 持久化消息
                channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送消息:'" + msg + "'成功");
            }
        }
    }

    Walau bagaimanapun, terdapat selang cache untuk menyimpan mesej Tiada tulisan sebenar pada cakera, dan jaminan ketahanan tidak cukup kuat, tetapi ia lebih daripada cukup untuk baris gilir yang mudah. .

    1.2 Pengagihan tidak adil

    Kaedah pengagihan pengundian tidak sesuai apabila pengguna mempunyai kecekapan pemprosesan yang berbeza. Oleh itu, keadilan sebenar harus mengikut premis bahawa mereka yang boleh melakukan lebih banyak kerja harus berbuat demikian.

    Ubah suai channel.basicQos(1); di pihak pengguna untuk menunjukkan menghidupkan pengedaran tidak adil

    /**
     * @Description 不公平分发消费者
     * @date 2022/3/7 9:27
     */
    public class Consumer2 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 模拟并发沉睡三十秒
                try {
                    Thread.sleep(30000);
                    System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            // 设置不公平分发
            channel.basicQos(1);
            channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                    consumerTag -> {
                        System.out.println(consumerTag + "消费者取消消费");
                    });
        }
    }

    1.3 Uji pengedaran tidak adil

    Tujuan ujian : Adakah mungkin untuk menyedari bahawa mereka yang boleh melakukan lebih banyak kerja.

    Kaedah ujian: Dua pengguna tidur peristiwa berbeza untuk mensimulasikan peristiwa pemprosesan berbeza Jika masa pemprosesan (masa tidur) cukup singkat untuk memproses berbilang mesej, tujuannya tercapai.

    Mula-mula mulakan pengeluar untuk membuat baris gilir, dan kemudian mulakan kedua-dua pengguna masing-masing.

    Pengeluar menghantar empat mesej mengikut urutan:

    Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ

    Thread A dengan masa tidur yang singkat menerima tiga mesej

    Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ

    Benang B, yang tidur lama, hanya menerima mesej kedua:

    Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ

    Sebab benang B memakan Ia mengambil masa yang lama , jadi mesej lain diberikan kepada urutan A.

    Percubaan berjaya!

    1.4 Nilai prafetch

    Penghantaran dan pengesahan manual mesej diselesaikan secara tidak segerak, jadi terdapat penimbal bagi mesej yang tidak disahkan, dan pembangun berharap untuk mengehadkan Saiz penimbal, digunakan untuk mengelakkan masalah mesej tidak diketahui tanpa had dalam penimbal.

    Nilai yang dijangkakan di sini ialah parameter dalam kaedah di atas channel.basicQos(); Jika terdapat mesej yang sama dengan parameter pada saluran semasa, saluran semasa tidak akan diatur untuk menggunakan mesej.

    1.4.1 Ujian Kod

    Kaedah ujian:

    1 Cipta dua pengguna berbeza dan berikan nilai jangkaan 5 dan 2 masing-masing.

    2. Tetapkan masa tidur yang panjang sebagai 5, dan masa tidur yang singkat sebagai 2.

    3 Jika mesej diperoleh mengikut nilai jangkaan yang ditetapkan, ini bermakna ujian itu berjaya, tetapi ia tidak bermakna ia akan diedarkan mengikut 5 dan 2. Ini sama dengan pertimbangan berat. .

    Kod boleh mengubah suai nilai yang dijangkakan mengikut kod di atas.

    2. Pengesahan keluaran

    Pengesahan keluaran ialah proses di mana selepas pengeluar menerbitkan mesej ke baris gilir, pengesahan baris gilir diteruskan dan kemudian dimaklumkan kepada pengeluar. Ini memastikan bahawa mesej tidak akan hilang.

    Perlu diambil perhatian bahawa kegigihan baris gilir perlu dihidupkan untuk menggunakan penerbitan yang disahkan.
    Kaedah pembukaan: channel.confirmSelect();

    2.1 Penerbitan pengesahan tunggal

    ialah kaedah penerbitan segerak, iaitu selepas menghantar mesej, hanya selepas ia disahkan dan diterbitkan, susulan Mesej akan terus diterbitkan Jika tiada pengesahan dalam masa yang ditetapkan, pengecualian akan dilemparkan. Kelemahannya ialah ia sangat perlahan.

    /**
     * @Description 确认发布——单个确认
     * @date 2022/3/7 14:49
     */
    public class SoloProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_solo";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 单个发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("发送消息:" + i);
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
    }

    2.2 Pengeluaran pengesahan kelompok

    Keluaran pengesahan kelompok demi kelompok boleh meningkatkan daya pengeluaran sistem. Walau bagaimanapun, kelemahannya ialah apabila kegagalan berlaku dan terdapat masalah dengan penerbitan, keseluruhan kumpulan perlu disimpan dalam ingatan dan diterbitkan semula kemudian.

    /**
     * @Description 确认发布——批量确认
     * @date 2022/3/7 14:49
     */
    public class BatchProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_batch";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 设置一个多少一批确认一次。
            int batchSize = MESSAGE_COUNT / 10;
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 批量发布确认
                if (i % batchSize == 0){
                    if (channel.waitForConfirms()){
                        System.out.println("发送消息:" + i);
                    }
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    Jelas sekali kecekapannya jauh lebih tinggi daripada keluaran pengesahan tunggal.

    2.3 Pengeluaran pengesahan tak segerak

    adalah lebih rumit daripada dua di atas dari segi pengaturcaraan, tetapi ia sangat menjimatkan kos sama ada kebolehpercayaan atau kecekapan, ia jauh lebih baik . Gunakan fungsi panggil balik untuk Untuk mencapai penghantaran mesej yang boleh dipercayai.

    /**
     * @Description 确认发布——异步确认
     * @date 2022/3/7 14:49
     */
    public class AsyncProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                System.out.println("未确认的消息:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    2.4 Mengendalikan mesej yang tidak diketahui

    Cara terbaik untuk mengendalikan mesej yang tidak diketahui ialah meletakkan mesej yang tidak diketahui dalam baris gilir berasaskan memori yang boleh diakses oleh penerbitan benang.

    Contohnya: ConcurrentLinkedQueue boleh memindahkan mesej antara baris gilir pengesahan confirm callbacks dan urutan penerbitan.

    Kaedah pemprosesan:

    1 Rekod semua mesej yang akan dihantar

    2

    3.Cetak mesej yang belum disahkan.

    Gunakan jadual cincang untuk menyimpan mesej, kelebihannya:

    可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    Atas ialah kandungan terperinci Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam