Heim  >  Artikel  >  Java  >  Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ

Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ

王林
王林nach vorne
2023-04-25 16:19:081278Durchsuche

    1. Persistenz

    Wenn der RabbitMQ-Dienst gestoppt wird, gehen die vom Nachrichtenersteller gesendeten Nachrichten nicht verloren. Standardmäßig werden Warteschlangen und Nachrichten ignoriert, wenn RabbitMQ beendet wird oder abstürzt. Um sicherzustellen, dass Nachrichten nicht verloren gehen, müssen sowohl die Warteschlange als auch die Nachricht als persistent markiert werden.

    1.1 Persistenz implementieren

    1. Warteschlangenpersistenz: Ändern Sie beim Erstellen der Warteschlange den zweiten Parameter von channel.queueDeclare(); auf true. channel.queueDeclare();第二个参数改为true。

    2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

    /**
     * @Description 持久化MQ
     * @date 2022/3/7 9:14
     */
    public class Producer3 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 持久化队列
            channel.queueDeclare(LONG_QUEUE,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            int i = 0;
            while (scanner.hasNext()){
                i++;
                String msg = scanner.next() + i;
                // 持久化消息
                channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送消息:'" + msg + "'成功");
            }
        }
    }

    但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

    1.2 不公平分发

    轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

    在消费者处修改channel.basicQos(1);表示开启不公平分发

    /**
     * @Description 不公平分发消费者
     * @date 2022/3/7 9:27
     */
    public class Consumer2 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 模拟并发沉睡三十秒
                try {
                    Thread.sleep(30000);
                    System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            // 设置不公平分发
            channel.basicQos(1);
            channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                    consumerTag -> {
                        System.out.println(consumerTag + "消费者取消消费");
                    });
        }
    }

    1.3 测试不公平分发

    测试目的:是否能实现能者多劳。

    测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

    先启动生产者创建队列,再分别启动两个消费者。

    生产者按照顺序发四条消息:

    Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ

    睡眠时间短的线程A接收到了三条消息

    Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ

    而睡眠时间长的线程B只接收到的第二条消息:

    Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ

    因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

    实验成功!

    1.4 预取值

    消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

    这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

    1.4.1 代码测试

    测试方法:

    1.新建两个不同的消费者分别给定预期值5个2。

    2.给睡眠时间长的指定为5,时间短的指定为2。

    3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

    代码根据上述代码修改预期值即可。

    2. 发布确认

    发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

    需要注意的是需要开启队列持久化才能使用确认发布。
    开启方法:channel.confirmSelect();

    2.1 单个确认发布

    是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

    /**
     * @Description 确认发布——单个确认
     * @date 2022/3/7 14:49
     */
    public class SoloProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_solo";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 单个发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("发送消息:" + i);
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
    }

    2.2 批量确认发布

    一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

    /**
     * @Description 确认发布——批量确认
     * @date 2022/3/7 14:49
     */
    public class BatchProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_batch";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 设置一个多少一批确认一次。
            int batchSize = MESSAGE_COUNT / 10;
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 批量发布确认
                if (i % batchSize == 0){
                    if (channel.waitForConfirms()){
                        System.out.println("发送消息:" + i);
                    }
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    显然效率要比单个确认发布的高很多。

    2.3 异步确认发布

    在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

    /**
     * @Description 确认发布——异步确认
     * @date 2022/3/7 14:49
     */
    public class AsyncProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                System.out.println("未确认的消息:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    2.4 处理未确认的消息

    最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

    例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks

    2. Nachrichtenpersistenz: Wenn Sie einen Kanal zum Senden einer Nachricht verwenden, ändern Sie channel.basicPublish(); den dritten Parameter in: MessageProperties.PERSISTENT_TEXT_PLAIN, um dauerhafte Nachrichten anzuzeigen.

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    Es gibt jedoch ein Cache-Intervall zum Speichern von Nachrichten. Es erfolgt kein tatsächliches Schreiben auf die Festplatte. Die Haltbarkeitsgarantie ist nicht stark genug, für eine einfache Warteschlange jedoch mehr als ausreichend.

    1.2 Unfaire Verteilung

    Die Umfrageverteilungsmethode ist nicht geeignet, wenn Verbraucher unterschiedliche Verarbeitungseffizienzen haben. Daher sollte wahre Fairness der Prämisse folgen, dass diejenigen, die mehr Arbeit leisten können, dies auch tun sollten.

    Ändern Sie channel.basicQos(1); beim Verbraucher, um die unfaire Verteilung zu aktivieren mehr Arbeit. 🎜🎜🎜Testmethode🎜: Zwei Verbraucher schlafen unterschiedliche Ereignisse, um unterschiedliche Verarbeitungsereignisse zu simulieren. Wenn die Verarbeitungszeit (Schlafzeit) kurz ist und mehrere Nachrichten verarbeitet werden können, wird der Zweck erreicht. 🎜🎜Starten Sie zuerst den Produzenten, um die Warteschlange zu erstellen, und starten Sie dann jeweils die beiden Verbraucher. 🎜🎜🎜Der Produzent sendet vier Nachrichten der Reihe nach: 🎜🎜🎜So implementieren Sie Java RabbitMQ-Persistenz und Freigabebestätigung“ />🎜🎜Thread A mit kurzer Ruhezeit hat drei Nachrichten erhalten🎜🎜<img src=🎜🎜Thread B, der lange schläft, hat nur die zweite Nachricht erhalten: 🎜🎜Wie Java RabbitMQ Persistenz und Freigabebestätigung implementiert🎜🎜Da Thread B lange braucht, um Nachrichten zu verarbeiten, werden andere Nachrichten Thread A zugewiesen. 🎜🎜Experiment erfolgreich! 🎜🎜1.4 Prefetch-Wert🎜🎜🎜Das Senden und die manuelle Bestätigung von Nachrichten werden asynchron durchgeführt, sodass es einen Puffer für unbestätigte Nachrichten gibt. Entwickler hoffen, die Größe des Puffers zu begrenzen, um Probleme mit unbestätigten Nachrichten zu vermeiden. 🎜🎜🎜Der erwartete Wert sind hier die Parameter in der obigen Methode channel.basicQos();. Wenn Nachrichten vorhanden sind, die den Parametern auf dem aktuellen Kanal entsprechen, wird der aktuelle Kanal nicht zum Konsumieren veranlasst Nachrichten. 🎜
    1.4.1 Codetest
    🎜🎜Testmethode: 🎜🎜🎜1 Erstellen Sie zwei verschiedene Verbraucher und geben Sie den erwarteten Wert von 5 bzw. 2 an. 🎜🎜2. Geben Sie die lange Schlafzeit als 5 und die kurze Schlafzeit als 2 an. 🎜🎜3. Wenn die Nachricht gemäß dem angegebenen erwarteten Wert erhalten wird, bedeutet dies, dass der Test erfolgreich ist, aber nicht, dass er gemäß 5 und 2 verteilt wird. Dies ähnelt der Gewichtsbeurteilung. 🎜🎜Der Code kann den erwarteten Wert gemäß dem obigen Code ändern. 🎜🎜2. Freigabebestätigung 🎜🎜🎜 Die Freigabebestätigung ist der Prozess, bei dem die Warteschlangenbestätigung beibehalten und dann dem Produzenten mitgeteilt wird, nachdem der Produzent die Nachricht in der Warteschlange veröffentlicht hat. Dadurch wird sichergestellt, dass die Nachricht nicht verloren geht. 🎜🎜🎜Es ist zu beachten, dass die Warteschlangenpersistenz aktiviert sein muss, um die bestätigte Veröffentlichung nutzen zu können.
    Öffnungsmethode: channel.confirmSelect();🎜🎜2.1 Veröffentlichung mit Einzelbestätigung🎜🎜🎜 ist eine synchrone Veröffentlichungsmethode, d. h. nach dem Senden einer Nachricht wird diese erst bestätigt und veröffentlicht Nachfolgende Nachrichten werden weiterhin veröffentlicht und es wird eine Ausnahme ausgelöst, wenn innerhalb der angegebenen Zeit keine Bestätigung erfolgt. Der Nachteil ist, dass es extrem langsam ist. 🎜🎜
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    🎜2.2 Batch-Bestätigungsfreigabe🎜🎜🎜Die Batch-Bestätigungsfreigabe kann den Durchsatz des Systems verbessern. Der Nachteil besteht jedoch darin, dass bei einem Fehler und einem Problem bei der Veröffentlichung der gesamte Stapel im Speicher gespeichert und später erneut veröffentlicht werden muss. 🎜🎜rrreee🎜Offensichtlich ist die Effizienz viel höher als die einer einzelnen Bestätigungsfreigabe. 🎜🎜2.3 Asynchrone Bestätigungsfreigabe🎜🎜🎜 ist in der Programmierung komplexer als die beiden oben genannten, aber sehr kostengünstig, egal ob zuverlässig oder effizient, es verwendet Rückruffunktionen, um eine zuverlässige Nachrichtenzustellung zu erreichen. 🎜🎜rrreee🎜2.4 Umgang mit unbestätigten Nachrichten 🎜🎜🎜Der beste Weg, mit unbestätigten Nachrichten umzugehen, besteht darin, unbestätigte Nachrichten in eine speicherbasierte Warteschlange zu stellen, auf die der Veröffentlichungsthread zugreifen kann. 🎜🎜🎜Zum Beispiel: ConcurrentLinkedQueue kann Nachrichten zwischen der Bestätigungswarteschlange confirm callbacks und dem Veröffentlichungsthread übertragen. 🎜🎜🎜Verarbeitungsmethode: 🎜🎜🎜1. Alle zu versendenden Nachrichten aufzeichnen; 🎜🎜3. 🎜🎜Verwenden Sie eine Hash-Tabelle zum Speichern von Nachrichten. Ihre Vorteile: 🎜

    可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    Das obige ist der detaillierte Inhalt vonImplementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen