When the RabbitMQ service is stopped, the messages sent by the message producer will not be lost. By default, queues and messages are ignored when RabbitMQ exits or crashes. In order to ensure that messages are not lost, both the queue and the message need to be marked as persistent.
1. Queue persistence: Change the second parameter of channel.queueDeclare();
to true when creating the queue.
2. Message persistence: When using a channel to send a message channel.basicPublish();
Change the third parameter to: MessageProperties.PERSISTENT_TEXT_PLAIN
indicates persistence information.
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化队列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
However, there is a cache interval for storing messages. There is no actual writing to the disk. The durability guarantee is not strong enough, but it is more than enough for a simple queue.
The polling distribution method is not suitable when consumers have different processing efficiencies. Therefore, true fairness should follow the premise that those who can do more work should do so.
Modify at the consumerchannel.basicQos(1);
Indicates turning on unfair distribution
/** * @Description 不公平分发消费者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡三十秒 try { Thread.sleep(30000); System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 设置不公平分发 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
Testing purpose: Whether it can be realized that those who can do more work.
Test method: Two consumers sleep different events to simulate different processing events. If the processing time (sleep time) is short and multiple messages can be processed, the purpose is achieved.
First start the producer to create the queue, and then start the two consumers respectively.
The producer sends four messages in sequence:
Thread A with a short sleep time received three messages
Thread B, which sleeps for a long time, only receives the second message:
Because thread B consumes It takes a long time, so other messages are assigned to thread A.
Experiment successful!
The sending and manual confirmation of messages are completed asynchronously, so there is a buffer of unconfirmed messages. Developers hope to limit the buffer Size, used to avoid the problem of unlimited unacknowledged messages in the buffer.
The expected value here is the parameters in the above method channel.basicQos();
. If there is a message equal to the parameter on the current channel, it will not be scheduled at the current time. Channel to consume messages.
Test method:
1. Create two different consumers and give the expected value of 5 and 2 respectively.
2. Specify 5 for long sleep time and 2 for short sleep time.
3. If the message is obtained according to the specified expected value, the test is successful, but it does not mean that it will be distributed according to 5 and 2. This is similar to the weight judgment.
The code can modify the expected value according to the above code.
Release confirmation is the process in which after the producer releases the message to the queue, the queue confirms the persistence and then notifies the producer. This ensures that the message will not be lost.
It should be noted that queue persistence needs to be turned on to use confirmed publishing.
Opening method: channel.confirmSelect();
is a synchronous publishing method, that is, after sending a message Afterwards, only after it is confirmed to be published, subsequent messages will continue to be published. If there is no confirmation within the specified time, an exception will be thrown. The disadvantage is that it is extremely slow.
/** * @Description 确认发布——单个确认 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Batch of confirmation release can improve the throughput of the system. However, the disadvantage is that when a failure occurs and problems occur in publishing, the entire batch needs to be saved in memory and re-published later.
/** * @Description 确认发布——批量确认 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Obviously the efficiency is much higher than that of a single confirmation release.
is more complicated in programming than the above two, but it is very cost-effective. Whether it is reliable or efficient, it is much better. Use callback functions to To achieve reliable message delivery.
/** * @Description 确认发布——异步确认 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未确认的消息:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
The best way to handle unconfirmed messages is to put unconfirmed messages in a memory-based queue that can be accessed by the publishing thread.
For example: ConcurrentLinkedQueue
You can transfer messages between the confirmation queue confirm callbacks
and the publishing thread.
Processing method:
1. Record all messages to be sent;
2. Delete at the confirmation of successful release;
3. Print unacknowledged messages.
Use a hash table to store messages, its advantages:
可以将需要和消息进行关联;轻松批量删除条目;支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
The above is the detailed content of Persistence and release confirmation implementation methods in Java RabbitMQ. For more information, please follow other related articles on the PHP Chinese website!