Heim >Java >javaLernprogramm >Implementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ
Wenn der RabbitMQ-Dienst gestoppt wird, gehen die vom Nachrichtenersteller gesendeten Nachrichten nicht verloren. Standardmäßig werden Warteschlangen und Nachrichten ignoriert, wenn RabbitMQ beendet wird oder abstürzt. Um sicherzustellen, dass Nachrichten nicht verloren gehen, müssen sowohl die Warteschlange als auch die Nachricht als persistent markiert werden.
1. Warteschlangenpersistenz: Ändern Sie beim Erstellen der Warteschlange den zweiten Parameter von channel.queueDeclare();
auf true. channel.queueDeclare();
第二个参数改为true。
2.消息持久化:在使用信道发送消息时channel.basicPublish();
将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN
表示持久化消息。
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化队列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。
轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。
在消费者处修改channel.basicQos(1);
表示开启不公平分发
/** * @Description 不公平分发消费者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡三十秒 try { Thread.sleep(30000); System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 设置不公平分发 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
测试目的:是否能实现能者多劳。
测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。
先启动生产者创建队列,再分别启动两个消费者。
生产者按照顺序发四条消息:
睡眠时间短的线程A接收到了三条消息
而睡眠时间长的线程B只接收到的第二条消息:
因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。
实验成功!
消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。
这里的预期值就值得是上述方法channel.basicQos();
里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。
测试方法:
1.新建两个不同的消费者分别给定预期值5个2。
2.给睡眠时间长的指定为5,时间短的指定为2。
3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。
代码根据上述代码修改预期值即可。
发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。
需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();
是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。
/** * @Description 确认发布——单个确认 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。
/** * @Description 确认发布——批量确认 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
显然效率要比单个确认发布的高很多。
在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。
/** * @Description 确认发布——异步确认 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未确认的消息:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。
例如:ConcurrentLinkedQueue
可以在确认队列confirm callbacks
channel.basicPublish();
den dritten Parameter in: MessageProperties.PERSISTENT_TEXT_PLAIN
, um dauerhafte Nachrichten anzuzeigen. ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();Es gibt jedoch ein Cache-Intervall zum Speichern von Nachrichten. Es erfolgt kein tatsächliches Schreiben auf die Festplatte. Die Haltbarkeitsgarantie ist nicht stark genug, für eine einfache Warteschlange jedoch mehr als ausreichend. 1.2 Unfaire Verteilung
Die Umfrageverteilungsmethode ist nicht geeignet, wenn Verbraucher unterschiedliche Verarbeitungseffizienzen haben. Daher sollte wahre Fairness der Prämisse folgen, dass diejenigen, die mehr Arbeit leisten können, dies auch tun sollten.
Ändern Siechannel.basicQos(1);
beim Verbraucher, um die unfaire Verteilung zu aktivieren mehr Arbeit. 🎜🎜🎜Testmethode🎜: Zwei Verbraucher schlafen unterschiedliche Ereignisse, um unterschiedliche Verarbeitungsereignisse zu simulieren. Wenn die Verarbeitungszeit (Schlafzeit) kurz ist und mehrere Nachrichten verarbeitet werden können, wird der Zweck erreicht. 🎜🎜Starten Sie zuerst den Produzenten, um die Warteschlange zu erstellen, und starten Sie dann jeweils die beiden Verbraucher. 🎜🎜🎜Der Produzent sendet vier Nachrichten der Reihe nach: 🎜🎜🎜🎜🎜Thread B, der lange schläft, hat nur die zweite Nachricht erhalten: 🎜🎜🎜🎜Da Thread B lange braucht, um Nachrichten zu verarbeiten, werden andere Nachrichten Thread A zugewiesen. 🎜🎜Experiment erfolgreich! 🎜🎜1.4 Prefetch-Wert🎜🎜🎜Das Senden und die manuelle Bestätigung von Nachrichten werden asynchron durchgeführt, sodass es einen Puffer für unbestätigte Nachrichten gibt. Entwickler hoffen, die Größe des Puffers zu begrenzen, um Probleme mit unbestätigten Nachrichten zu vermeiden. 🎜🎜🎜Der erwartete Wert sind hier die Parameter in der obigen Methode channel.basicQos();
. Wenn Nachrichten vorhanden sind, die den Parametern auf dem aktuellen Kanal entsprechen, wird der aktuelle Kanal nicht zum Konsumieren veranlasst Nachrichten. 🎜channel.confirmSelect();
🎜🎜2.1 Veröffentlichung mit Einzelbestätigung🎜🎜🎜 ist eine synchrone Veröffentlichungsmethode, d. h. nach dem Senden einer Nachricht wird diese erst bestätigt und veröffentlicht Nachfolgende Nachrichten werden weiterhin veröffentlicht und es wird eine Ausnahme ausgelöst, wenn innerhalb der angegebenen Zeit keine Bestätigung erfolgt. Der Nachteil ist, dass es extrem langsam ist. 🎜🎜/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }🎜2.2 Batch-Bestätigungsfreigabe🎜🎜🎜Die Batch-Bestätigungsfreigabe kann den Durchsatz des Systems verbessern. Der Nachteil besteht jedoch darin, dass bei einem Fehler und einem Problem bei der Veröffentlichung der gesamte Stapel im Speicher gespeichert und später erneut veröffentlicht werden muss. 🎜🎜rrreee🎜Offensichtlich ist die Effizienz viel höher als die einer einzelnen Bestätigungsfreigabe. 🎜🎜2.3 Asynchrone Bestätigungsfreigabe🎜🎜🎜 ist in der Programmierung komplexer als die beiden oben genannten, aber sehr kostengünstig, egal ob zuverlässig oder effizient, es verwendet Rückruffunktionen, um eine zuverlässige Nachrichtenzustellung zu erreichen. 🎜🎜rrreee🎜2.4 Umgang mit unbestätigten Nachrichten 🎜🎜🎜Der beste Weg, mit unbestätigten Nachrichten umzugehen, besteht darin, unbestätigte Nachrichten in eine speicherbasierte Warteschlange zu stellen, auf die der Veröffentlichungsthread zugreifen kann. 🎜🎜🎜Zum Beispiel:
ConcurrentLinkedQueue
kann Nachrichten zwischen der Bestätigungswarteschlange confirm callbacks
und dem Veröffentlichungsthread übertragen. 🎜🎜🎜Verarbeitungsmethode: 🎜🎜🎜1. Alle zu versendenden Nachrichten aufzeichnen; 🎜🎜3. 🎜🎜Verwenden Sie eine Hash-Tabelle zum Speichern von Nachrichten. Ihre Vorteile: 🎜可以将需要和消息进行关联;轻松批量删除条目;支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Das obige ist der detaillierte Inhalt vonImplementierungsmethoden für Persistenz und Release-Bestätigung in Java RabbitMQ. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!