搜尋
首頁Javajava教程Java訊息佇列任務的平滑關閉

1.問題背景

對於訊息佇列的監聽,我們一般使用Java寫一個獨立的程序,在Linux伺服器上運行。程式啟動後,透過訊息佇列客戶端接收訊息,放入一個執行緒池進行非同步處理,並發的快速處理。

那麼問題來了,當我們修改程式後,需要重新啟動任務的時候,如何保證訊息的不遺失呢?

正常來說,訂閱者程式關閉後,訊息會在發送者隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的訊息是不會遺失的。唯一可能遺失的訊息,就是在關閉的一瞬間,已經從佇列中取出但還沒有處理完畢的訊息。

因此我們需要一套平滑關閉的機制,確保在重啟的時候,訊息可以正常處理完成。

2.問題分析

平滑關閉的思路如下:

在關閉程式時,首先關閉訊息訂閱,這個時候訊息都在發送者佇列中

關閉本地訊息處理執行緒池(等待本地執行緒池中的訊息處理完畢)

程式退出

關閉訊息訂閱:一般訊息佇列的客戶端都提供關閉連線的方法,具體可以自行查看api

關閉執行緒池:Java的ThreadPoolExecutor執行緒池提供shutdown()和shutdownNow( )兩個方法,差異是前者會等待線程池中的消息都處理完畢,後者直接停止線程的執行並返回list集合。因為我們需要使用shutdown()方法進行關閉,並通過isTerminated(),方法判斷線程池是否已經關閉.

那麼問題又來了,我們如何通知到程序,需要執行關閉操作呢?

在Linux中,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以透過 kill -l查看kill 指令的其它信號量,例如使用12) SIGUSR2 信號量

我們可以在Java程式啟動時,註冊對應的信號量,對信號量進行監聽,在收到對應的kill操作時,執行相關的業務操作。

偽代碼如下

 //注册linux kill信号量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //关闭订阅者
        //关闭线程池
        //退出
    }
});

下面透過一個demo模擬相關邏輯操作

先模擬一個生產者,每秒生產5個訊息

然後模擬一個訂閱者,收到訊息後交給線程池進行處理,線程池固定4個線程,每個訊息處理時間1秒,這樣線程池每秒會積壓1個訊息。

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //从队列中拿到消息
            final String msg = (String)queue.take();            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");                    //关闭订阅消费
                    isClose = true;                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}

當我們在服務上運行時,透過控制台可以看到相關的輸出訊息,demo中輸出了線程池的積壓訊息個數

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

Java訊息佇列任務的平滑關閉

另打開一個終端,透過ps命令查看進程號,或透過nohup啟動Java程序拿到流程id

ps -fe|grep MsgClient

Java訊息佇列任務的平滑關閉

當我們執行kill -12 pid的時候可以看到關閉業務邏輯

Java訊息佇列任務的平滑關閉

3.問題總結

3.問題總結

在部門的實際業務中,在部門的實際業務中,訊息佇列的訊息量還挺大的,某些業務高峰時每秒有幾百的訊息量,因此對訊息的處理要保證速度,避免訊息積壓,也可以透過負載解決單一訂閱節點的壓力。 🎜🎜在某些業務場景中,對訊息的完整性要求不那麼高,那麼就不用考慮重啟時的一點損耗。反之,就需要好好思考設計了。 🎜🎜🎜🎜
陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
如何將Maven或Gradle用於高級Java項目管理,構建自動化和依賴性解決方案?如何將Maven或Gradle用於高級Java項目管理,構建自動化和依賴性解決方案?Mar 17, 2025 pm 05:46 PM

本文討論了使用Maven和Gradle進行Java項目管理,構建自動化和依賴性解決方案,以比較其方法和優化策略。

如何使用適當的版本控制和依賴項管理創建和使用自定義Java庫(JAR文件)?如何使用適當的版本控制和依賴項管理創建和使用自定義Java庫(JAR文件)?Mar 17, 2025 pm 05:45 PM

本文使用Maven和Gradle之類的工具討論了具有適當的版本控制和依賴關係管理的自定義Java庫(JAR文件)的創建和使用。

如何使用咖啡因或Guava Cache等庫在Java應用程序中實現多層緩存?如何使用咖啡因或Guava Cache等庫在Java應用程序中實現多層緩存?Mar 17, 2025 pm 05:44 PM

本文討論了使用咖啡因和Guava緩存在Java中實施多層緩存以提高應用程序性能。它涵蓋設置,集成和績效優勢,以及配置和驅逐政策管理最佳PRA

如何將JPA(Java持久性API)用於具有高級功能(例如緩存和懶惰加載)的對象相關映射?如何將JPA(Java持久性API)用於具有高級功能(例如緩存和懶惰加載)的對象相關映射?Mar 17, 2025 pm 05:43 PM

本文討論了使用JPA進行對象相關映射,並具有高級功能,例如緩存和懶惰加載。它涵蓋了設置,實體映射和優化性能的最佳實踐,同時突出潛在的陷阱。[159個字符]

Java的類負載機制如何起作用,包括不同的類載荷及其委託模型?Java的類負載機制如何起作用,包括不同的類載荷及其委託模型?Mar 17, 2025 pm 05:35 PM

Java的類上載涉及使用帶有引導,擴展程序和應用程序類負載器的分層系統加載,鏈接和初始化類。父代授權模型確保首先加載核心類別,從而影響自定義類LOA

如何將Java的RMI(遠程方法調用)用於分佈式計算?如何將Java的RMI(遠程方法調用)用於分佈式計算?Mar 11, 2025 pm 05:53 PM

本文解釋了用於構建分佈式應用程序的Java的遠程方法調用(RMI)。 它詳細介紹了接口定義,實現,註冊表設置和客戶端調用,以解決網絡問題和安全性等挑戰。

如何使用Java的插座API進行網絡通信?如何使用Java的插座API進行網絡通信?Mar 11, 2025 pm 05:53 PM

本文詳細介紹了用於網絡通信的Java的套接字API,涵蓋了客戶服務器設置,數據處理和關鍵考慮因素,例如資源管理,錯誤處理和安全性。 它還探索了性能優化技術,我

如何在Java中創建自定義網絡協議?如何在Java中創建自定義網絡協議?Mar 11, 2025 pm 05:52 PM

本文詳細介紹了創建自定義Java網絡協議。 它涵蓋協議定義(數據結構,框架,錯誤處理,版本控制),實現(使用插座),數據序列化和最佳實踐(效率,安全性,維護

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
3 週前By尊渡假赌尊渡假赌尊渡假赌

熱工具

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具

EditPlus 中文破解版

EditPlus 中文破解版

體積小,語法高亮,不支援程式碼提示功能

WebStorm Mac版

WebStorm Mac版

好用的JavaScript開發工具

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)