1. Problem background
For message queue monitoring, we generally use Java to write a separate program and run it on the Linux server. After the program is started, messages are received through the message queue client and put into a thread pool for asynchronous processing, allowing for fast concurrent processing.
Then the question is, when we modify the program and need to restart the task, how to ensure that the message is not lost?
Normally, after the subscriber program is closed, messages will accumulate in the sender queue, waiting for the subscriber to subscribe and consume next time, so unreceived messages will not be lost. The only messages that may be lost are messages that have been taken out of the queue but not yet processed at the moment of shutdown.
So we need a smooth shutdown mechanism to ensure that messages can be processed normally when restarting.
2. Problem analysis
The idea of smooth shutdown is as follows:
When closing the program, first close the message subscription. At this time, the messages are all in the sender queue
Close the local message processing thread pool (waiting for the local thread pool) The message is processed)
The program exits
Close message subscription: Generally, message queue clients provide methods to close connections. For details, you can view the API by yourself
Close the thread pool: Java's ThreadPoolExecutor thread pool provides shutdown() and shutdownNow( ) two methods, the difference is that the former will wait for the messages in the thread pool to be processed, while the latter directly stops the execution of the thread and returns the list collection. Because we need to use the shutdown() method to shut down, and use the isTerminated() method to determine whether the thread pool has been closed.
Then the question comes again, how do we notify the program that a shutdown operation needs to be performed?
In Linux , we can use kill -9 pid to shut down the process. In addition to -9, we can use kill -l to view other semaphores of the kill command, such as using 12) SIGUSR2 semaphore
We can register the corresponding signal when the Java program starts semaphore, monitor the semaphore, and perform relevant business operations when receiving the corresponding kill operation.
The pseudo code is as follows
//注册linux kill信号量 kill -12Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { //关闭订阅者 //关闭线程池 //退出 } });
The following simulates the relevant logical operations through a demo
First simulate a producer, producing 5 messages per second
Then simulate a subscriber, and after receiving the message, hand it over to the thread pool for processing, the thread The pool has a fixed number of 4 threads, and each message processing time is 1 second, so that the thread pool will backlog 1 message per second.
package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */public class MsgClient { //模拟消息队列订阅者 同时4个线程处理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模拟消息队列生产者 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判断是否关闭订阅 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer(final BlockingQueue queue){ //每200毫秒向队列中放入一个消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } }, 0L, 200L, TimeUnit.MILLISECONDS); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //从队列中拿到消息 final String msg = (String)queue.take(); //放入线程池处理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看线程池堆积消息个数 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { System.out.println("收到kill消息,执行关闭操作"); //关闭订阅消费 isClose = true; //关闭线程池,等待线程池积压消息处理 THREAD_POOL.shutdown(); //判断线程池是否关闭 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判断线程池积压数量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("订阅者关闭,线程池处理完毕"); System.exit(0); } }); } } }
When we run on the service, we can see the relevant output information through the console. The demo outputs the number of backlog messages in the thread pool
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
Open another terminal and check the process number through the ps command , or start the Java process through nohup to get the process id
ps -fe|grep MsgClient
When we execute kill -12 pid, we can see the shutdown business logic
3. Problem summary
In the actual business of the department, The message volume of the message queue is still quite large. There are hundreds of messages per second during some business peaks. Therefore, the processing speed of messages must be ensured to avoid message backlog. The pressure on a single subscription node can also be solved through load.
In some business scenarios, the requirements for message integrity are not so high, so there is no need to consider the loss during restart. On the contrary, it requires careful thinking and design.