1. Problemhintergrund
Für die Überwachung der Nachrichtenwarteschlange verwenden wir im Allgemeinen Java, um ein separates Programm zu schreiben und es auf dem Linux-Server auszuführen. Nachdem das Programm gestartet wurde, werden Nachrichten über den Nachrichtenwarteschlangen-Client empfangen und zur asynchronen Verarbeitung in einen Thread-Pool gestellt, was eine schnelle gleichzeitige Verarbeitung ermöglicht.
Dann stellt sich die Frage: Wenn wir das Programm ändern und die Aufgabe neu starten müssen, wie stellen wir sicher, dass die Nachricht nicht verloren geht?
Normalerweise sammeln sich Nachrichten nach dem Schließen des Abonnentenprogramms in der Absenderwarteschlange an und warten auf den nächsten Abonnementverbrauch des Abonnenten, sodass nicht empfangene Nachrichten nicht verloren gehen. Die einzigen Nachrichten, die verloren gehen können, sind Nachrichten, die aus der Warteschlange genommen, aber zum Zeitpunkt des Herunterfahrens noch nicht verarbeitet wurden.
Daher benötigen wir einen reibungslosen Shutdown-Mechanismus, um sicherzustellen, dass Nachrichten beim Neustart normal verarbeitet werden können.
2. Problemanalyse
Die Idee des reibungslosen Herunterfahrens ist wie folgt:
Schließen Sie beim Schließen des Programms zunächst das Nachrichtenabonnement befinden sich alle in der Absenderwarteschlange
Schließen Sie den lokalen Nachrichtenverarbeitungs-Thread-Pool (warten Sie, bis die Nachrichten im lokalen Thread-Pool verarbeitet werden)
Das Programm wird beendet
Nachricht schließen Abonnement: Im Allgemeinen stellen Nachrichtenwarteschlangen-Clients Methoden zum Schließen von Verbindungen bereit. Weitere Informationen finden Sie in der API
zum Schließen des Thread-Pools: Der ThreadPoolExecutor von Java bietet zwei Methoden: Shutdown() und ShutdownNow() Der Unterschied besteht darin, dass ersteres auf die Verarbeitung aller Nachrichten im Thread-Pool wartet, während letzteres direkt die Ausführung des Threads stoppt und die Listensammlung zurückgibt. Denn wir müssen die Methode „shutdown()“ zum Herunterfahren verwenden und die Methode „isTerminated()“ verwenden, um festzustellen, ob der Thread-Pool geschlossen wurde.
Dann stellt sich erneut die Frage: Wie benachrichtigen wir das Programm darüber? Muss ein Shutdown-Vorgang durchgeführt werden?
Unter Linux können wir kill -9 pid verwenden, um den Prozess herunterzufahren. Zusätzlich zu -9 können wir kill -l verwenden, um andere Semaphoren des Kill-Befehls anzuzeigen , wie zum Beispiel mit 12) SIGUSR2-Semaphor
Wir können das entsprechende Semaphor registrieren, wenn das Java-Programm startet, das Semaphor überwachen und verwandte Geschäftsvorgänge ausführen, wenn wir den entsprechenden Kill-Vorgang empfangen.
Der Pseudocode lautet wie folgt
//注册linux kill信号量 kill -12Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { //关闭订阅者 //关闭线程池 //退出 } });
Das Folgende simuliert verwandte logische Operationen durch eine Demo
Simulieren Sie zunächst einen Produzenten, der 5 Nachrichten pro Sekunde produziert
Simulieren Sie dann einen Abonnenten und übergeben Sie ihn nach Erhalt an den Thread-Pool zur Verarbeitung. Der Thread-Pool verfügt über 4 feste Threads und die Verarbeitungszeit jeder Nachricht beträgt 1 Sekunde Nachricht pro Sekunde.
package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */public class MsgClient { //模拟消息队列订阅者 同时4个线程处理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模拟消息队列生产者 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判断是否关闭订阅 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer(final BlockingQueue queue){ //每200毫秒向队列中放入一个消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } }, 0L, 200L, TimeUnit.MILLISECONDS); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //从队列中拿到消息 final String msg = (String)queue.take(); //放入线程池处理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看线程池堆积消息个数 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { System.out.println("收到kill消息,执行关闭操作"); //关闭订阅消费 isClose = true; //关闭线程池,等待线程池积压消息处理 THREAD_POOL.shutdown(); //判断线程池是否关闭 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判断线程池积压数量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("订阅者关闭,线程池处理完毕"); System.exit(0); } }); } } }
Wenn wir den Dienst ausführen, können wir die relevanten Ausgabeinformationen über die Konsole sehen. Die Demo gibt die Anzahl der Backlog-Nachrichten im Thread-Pool aus
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
Öffnen Sie ein anderes Terminal und überprüfen Sie die Prozessnummer über den Befehl ps oder starten Sie den Java-Prozess über nohup, um die Prozess-ID zu erhalten
ps -fe|grep MsgClient
Wenn wir ausführen Beim Töten von -12 pid können Sie die Geschäftslogik zum Herunterfahren sehen
3 Problemzusammenfassung
Im eigentlichen Geschäft der Abteilung Das Nachrichtenvolumen der Nachrichtenwarteschlange ist bei einigen Geschäftsspitzen immer noch ziemlich groß. Daher muss die Verarbeitungsgeschwindigkeit der Nachrichten sichergestellt werden, um den Druck auf einen einzelnen Abonnementknoten zu vermeiden durch Last.
In einigen Geschäftsszenarien sind die Anforderungen an die Nachrichtenintegrität nicht so hoch, sodass der Verlust beim Neustart nicht berücksichtigt werden muss. Im Gegenteil, es erfordert sorgfältiges Denken und Design.