Heim >Java >javaLernprogramm >Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

高洛峰
高洛峰Original
2016-11-17 12:46:531374Durchsuche

1. Problemhintergrund

Für die Überwachung der Nachrichtenwarteschlange verwenden wir im Allgemeinen Java, um ein separates Programm zu schreiben und es auf dem Linux-Server auszuführen. Nachdem das Programm gestartet wurde, werden Nachrichten über den Nachrichtenwarteschlangen-Client empfangen und zur asynchronen Verarbeitung in einen Thread-Pool gestellt, was eine schnelle gleichzeitige Verarbeitung ermöglicht.

Dann stellt sich die Frage: Wenn wir das Programm ändern und die Aufgabe neu starten müssen, wie stellen wir sicher, dass die Nachricht nicht verloren geht?

Normalerweise sammeln sich Nachrichten nach dem Schließen des Abonnentenprogramms in der Absenderwarteschlange an und warten auf den nächsten Abonnementverbrauch des Abonnenten, sodass nicht empfangene Nachrichten nicht verloren gehen. Die einzigen Nachrichten, die verloren gehen können, sind Nachrichten, die aus der Warteschlange genommen, aber zum Zeitpunkt des Herunterfahrens noch nicht verarbeitet wurden.

Daher benötigen wir einen reibungslosen Shutdown-Mechanismus, um sicherzustellen, dass Nachrichten beim Neustart normal verarbeitet werden können.

2. Problemanalyse

Die Idee des reibungslosen Herunterfahrens ist wie folgt:

Schließen Sie beim Schließen des Programms zunächst das Nachrichtenabonnement befinden sich alle in der Absenderwarteschlange

Schließen Sie den lokalen Nachrichtenverarbeitungs-Thread-Pool (warten Sie, bis die Nachrichten im lokalen Thread-Pool verarbeitet werden)

Das Programm wird beendet

Nachricht schließen Abonnement: Im Allgemeinen stellen Nachrichtenwarteschlangen-Clients Methoden zum Schließen von Verbindungen bereit. Weitere Informationen finden Sie in der API

zum Schließen des Thread-Pools: Der ThreadPoolExecutor von Java bietet zwei Methoden: Shutdown() und ShutdownNow() Der Unterschied besteht darin, dass ersteres auf die Verarbeitung aller Nachrichten im Thread-Pool wartet, während letzteres direkt die Ausführung des Threads stoppt und die Listensammlung zurückgibt. Denn wir müssen die Methode „shutdown()“ zum Herunterfahren verwenden und die Methode „isTerminated()“ verwenden, um festzustellen, ob der Thread-Pool geschlossen wurde.

Dann stellt sich erneut die Frage: Wie benachrichtigen wir das Programm darüber? Muss ein Shutdown-Vorgang durchgeführt werden?

Unter Linux können wir kill -9 pid verwenden, um den Prozess herunterzufahren. Zusätzlich zu -9 können wir kill -l verwenden, um andere Semaphoren des Kill-Befehls anzuzeigen , wie zum Beispiel mit 12) SIGUSR2-Semaphor

Wir können das entsprechende Semaphor registrieren, wenn das Java-Programm startet, das Semaphor überwachen und verwandte Geschäftsvorgänge ausführen, wenn wir den entsprechenden Kill-Vorgang empfangen.

Der Pseudocode lautet wie folgt

 //注册linux kill信号量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //关闭订阅者
        //关闭线程池
        //退出
    }
});

Das Folgende simuliert verwandte logische Operationen durch eine Demo

Simulieren Sie zunächst einen Produzenten, der 5 Nachrichten pro Sekunde produziert

Simulieren Sie dann einen Abonnenten und übergeben Sie ihn nach Erhalt an den Thread-Pool zur Verarbeitung. Der Thread-Pool verfügt über 4 feste Threads und die Verarbeitungszeit jeder Nachricht beträgt 1 Sekunde Nachricht pro Sekunde.

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //从队列中拿到消息
            final String msg = (String)queue.take();            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");                    //关闭订阅消费
                    isClose = true;                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}

Wenn wir den Dienst ausführen, können wir die relevanten Ausgabeinformationen über die Konsole sehen. Die Demo gibt die Anzahl der Backlog-Nachrichten im Thread-Pool aus

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Öffnen Sie ein anderes Terminal und überprüfen Sie die Prozessnummer über den Befehl ps oder starten Sie den Java-Prozess über nohup, um die Prozess-ID zu erhalten

ps -fe|grep MsgClient

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Wenn wir ausführen Beim Töten von -12 pid können Sie die Geschäftslogik zum Herunterfahren sehen

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

3 Problemzusammenfassung

Im eigentlichen Geschäft der Abteilung Das Nachrichtenvolumen der Nachrichtenwarteschlange ist bei einigen Geschäftsspitzen immer noch ziemlich groß. Daher muss die Verarbeitungsgeschwindigkeit der Nachrichten sichergestellt werden, um den Druck auf einen einzelnen Abonnementknoten zu vermeiden durch Last.

In einigen Geschäftsszenarien sind die Anforderungen an die Nachrichtenintegrität nicht so hoch, sodass der Verlust beim Neustart nicht berücksichtigt werden muss. Im Gegenteil, es erfordert sorgfältiges Denken und Design.


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn