首頁 >Java >java教程 >java多執行緒怎麼透過CompletableFuture組裝異步計算單元

java多執行緒怎麼透過CompletableFuture組裝異步計算單元

王林
王林轉載
2023-05-11 19:04:041210瀏覽

    CompletableFuture 介紹

    CompletableFuture是1.8引入的新特性,一些比較複雜的非同步計算場景,尤其是需要串連多個非同步運算單元的場景,可以考慮使用CompletableFuture 來實作。

    在現實世界中,我們需要解決的複雜問題都是要分為若干步驟。就像我們的程式碼一樣,在一個複雜的邏輯方法中,會呼叫多個方法來一步一步實作。

    設想如下場景,植樹節要進行植樹,分為下面幾個步驟:

    • #挖洞10 分鐘

    • 拿樹苗5 分鐘

    • 種樹苗20 分鐘

    • #澆水5 分鐘

    ##其中1 和2 可以並行,1 和2 都完成了才能進行步驟3,然後才能進行步驟4。

    我們有以下幾種實作方式:

    只有一個人種樹

    如果現在只有一個人植樹,要種100 棵樹,那麼只能依照下列順序執行:

    java多執行緒怎麼透過CompletableFuture組裝異步計算單元

    圖中僅列舉種三棵樹示意。可以看到串列執行,只能種完一棵樹再種一棵,那麼種完 100 棵樹就需要

    40 * 100 = 4000 分鐘。這種方式對應到程序,就是單執行緒同步執行。

    三個人同時種樹,每個人負責種一棵樹

    如何縮短種樹時長呢?你肯定想這還不好辦,學習了這麼久的並發,這肯定難不倒我。不是要種 100 棵樹嗎?那我找 100 個人一塊種,每個人種一棵。那就只要 40 分鐘就可以種完 100 棵樹了。

    沒錯,如果你的程式有個方法叫做 plantTree,裡麵包含瞭如上四部,那麼你起 100 個線程就可以了。但是,請注意,100 個執行緒的建立和銷毀需要消耗大量的系統資源。並且創建和銷毀線程都有時間消耗。此外CPU的核數並不能真的支援100個執行緒並發。如果我們要種1萬棵樹呢?總不能起一萬個線程吧?

    所以這只是理想情況,我們通常是透過執行緒池來執行,並不會真的啟動100個執行緒。

    多個人同時種樹

    種每一棵樹的時候,不依賴的步驟可以分不同的人並行幹

    #這種方式可以進一步縮短種樹的時長,因為第一步挖坑和第二步拿樹苗可以兩個人並行去做,所以每棵樹只需要35 分鐘。如下圖:

    java多執行緒怎麼透過CompletableFuture組裝異步計算單元

    如果程式還是 100 個主執行緒並發執行 plantTree 方法,那麼只需要 35 分鐘種完 100 顆樹。這裡要注意每個執行緒中,由於還要並發兩個執行緒去做 1,2 兩個步驟。實際運作中會又 100 x 3 = 300 個執行緒參與植樹。但負責 1,2 步驟的線程只會短暫參與,然後就閒置了。

    這種方法和第二種方式也存在大量建立執行緒的問題。所以也只是理想情況。

    假如只有4 個人植樹,每個人只負責自己的步驟

    java多執行緒怎麼透過CompletableFuture組裝異步計算單元

    可以看到一開始小王挖完第一個坑後,小李已經取回兩個樹苗,但此時小張才能開始種第一個樹苗。此後小張就可以一個接一個的去種樹苗了,並且在他種下一棵樹苗的時候,小趙可以並行澆水。依照這個流程走下來,種完 100 顆樹苗需要 10 20x100 5=2015 分鐘。比單線程的4000分鐘好了很多,但是遠遠比不上 100 個線程並發種樹的速度。不過不要忘記 100 個線程並發只是理想情況,而本方法只用了 4 個線程。

    我們再對分工做下調整。每個人不只做自己的工作,一旦自己的工作做完了就看有沒有其他工作可以做。例如小王挖坑完後,發現可以種樹苗,那麼他就去種樹苗。小李拿樹苗完成後也可以去挖坑或種樹苗。這樣整體的效率就會更高了。如果基於這種思想,那麼我們實際上把任務分成了 4 類,每類 100 件,總共 400 件任務。 400 件任務全部完成,代表整個任務就完成了。那麼任務的參與者只需要知道任務的依賴,然後不斷領取可以執行的任務去執行。這樣的效率將會是最高的。

    前文說到我們不可能通過100個線程並發來執行任務,所以一般情況下我們都會使用線程池,這和上面的設計思想不謀而合。使用線程池後,由於第四種方式把步驟拆的更細,提高了並發的可能性。因此速度會比第二種方式更快。那麼和第三種比起來,哪種比較快呢?如果執行緒數可以無窮大,這兩個方法所能達到的最短時間是一樣的,都是 35 分鐘。不過在線程有限的情況下,第四種方式對線程的使用率會更高,因為每個步驟都可以並行執行(參與種樹的人完成自己的工作後,都可以去幫助其他人),線程的調度更為靈活,所以執行緒池中的執行緒很難閒下來,一直保持在運作中。是的,誰都不能偷懶。而第三種由於只能並發在 plantTree 方法及挖坑和拿樹苗,所以不如第四種方式靈活

    上文講了這麼多,主要是要說明 CompletableFuture 出現的原因。他用來把複雜任務拆解為一個個銜接的非同步執行步驟,從而提升整體的效率。我們回一下小節題目:誰都不能偷懶。沒錯,這就是 CompletableFuture 要達到的效果,透過計算單元的抽象,讓執行緒能夠有效率的並發參與每一個步驟。同步的程式碼透過 CompletableFuture 可以完全改造為非同步程式碼。下面我們就來看看如何使用 CompletableFuture。

    CompletableFuture 使用

    CompletableFuture 實作了 Future 介面並且實作了 CompletionStage 介面。 Future 介面我們已經很熟悉了,而CompletionStage 介面定了非同步運算步驟之間的規範,這樣確保一步一步能夠銜接上。 CompletionStage 定義了38 個 public 的方法用於非同步計算步驟間的銜接。接下來我們會挑選一些常用的,相對使用頻率較高的方法,來看看如何使用。

    已知計算結果

    如果你已經知道 CompletableFuture 的計算結果,可以使用靜態方法 completedFuture。傳入計算結果,聲明CompletableFuture 物件。呼叫 get 方法時會立即傳回傳入的計算結果,不會被阻塞,如下程式碼:

    public static void main(String[] args) throws Exception{
        CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World");
        System.out.println("result is " + completableFuture.get());
    }
    // result is Hello World

    是不是覺得這種用法沒有什麼意義?既然知道計算結果了,直接使用就好了,為什麼還要透過 CompletableFuture 包裝呢?這是因為非同步計算單元需要透過 CompletableFuture 進行銜接,所以有的時候我們即使已經知道計算結果,也需要包裝為 CompletableFuture,才能融入到非同步計算的流程之中。

    封裝有傳回值的非同步計算邏輯

    這是我們最常用的方式。把需要非同步計算的邏輯封裝為一個計算單元,交由 CompletableFuture 運作。如下面的程式碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成

    這裡我們使用了 CompletableFuture 的 supplyAsync 方法,以 lambda 表達式的方式向其傳遞了一個 supplier 介面的實作。

    可見 completableFuture.get() 拿到的計算結果就是你傳入函數執行後 return 的值。那如果你有需要非同步運算的邏輯,那就可以放到 supplyAsync 傳入的函數體中。這段函數是如何被非同步執行的呢?如果你跟入程式碼可以看到其實 supplyAsync 是透過 Executor,也就是線程池來運行這段函數的。 completableFuture 預設使用的是ForkJoinPool,當然你也可以透過為 supplyAsync 指定其他 Excutor,透過第二個參數傳入 supplyAsync 方法。

    supplyAsync 使用場景非常多,舉個簡單的例子,主程式需要呼叫多個微服務的介面請求數據,那麼就可以啟動多個CompletableFuture,呼叫supplyAsync,函數體中是關於不同介面的調用邏輯。這樣不同的介面請求就可以非同步同時運行,最後再等全部介面返回時,執行後面的邏輯。

    封裝無回傳值的非同步運算邏輯

    supplyAsync 接收的函數是有傳回值的。有些情況我們只是一段計算過程,並不需要回傳值。這就像 Runnable 的run 方法,並沒有回傳值。這個情況我們可以使用 runAsync方法,如下面的程式碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成"));
        completableFuture.get();
    }
    // 挖坑完成

    runAsync 接收 runnable 介面的函數。所以並無回傳值。栗子中的邏輯只是列印“挖坑完成”。

    進一步處理非同步回傳的結果,並傳回新的計算結果

    當我們透過supplyAsync 完成了非同步計算,傳回CompletableFuture,此時可以繼續對傳回結果進行加工,如下面的程式碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenApply(s -> s + ", 并且归还铁锹")
                .thenApply(s -> s + ", 全部完成。");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹, 全部完成。

    在呼叫supplyAsync 後,我們兩次鍊式呼叫thenApply 方法。 s 是前一步 supplyAsync 回傳的計算結結果,我們對結算結果進行了兩次再加工。我們可以透過 thenApply 不斷對計算結果進行加工處理。如果想要非同步運行 thenApply 的邏輯,可以使用 thenApplyAsync。使用方法相同,只不過會透過執行緒池異步運行。

    进一步处理异步返回的结果,无返回

    这种场景你可以使用thenApply。这个方法可以让你处理上一步的返回结果,但无返回值。参照如下代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"));
        completableFuture.get();
    }

    这里可以看到 thenAccept 接收的函数没有返回值,只有业务逻辑。处理后返回 CompletableFuture 类型对象。

    既不需要返回值,也不需要上一步计算结果,只想在执行结束后再执行一段代码

    此时你可以使用 thenRun 方法,他接收 Runnable 的函数,没有输入也没有输出,仅仅是在异步计算结束后回调一段逻辑,比如记录 log 等。参照下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"))
                .thenRun(() -> System.out.println("挖坑工作已经全部完成"));
        completableFuture.get();
    }
    // 挖坑完成, 并且归还铁锹
    // 挖坑工作已经全部完成

    可以看到在 thenAccept 之后继续调用了 thenRun,仅仅是打印了日志而已

    组合 Future 处理逻辑

    我们可以把两个 CompletableFuture 组合起来使用,如下面的代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且归还铁锹"));
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹

    thenApply 和 thenCompose 的关系就像 stream中的 map 和 flatmap。从上面的例子来看,thenApply 和thenCompose 都可以实现同样的功能。但是如果你使用一个第三方的库,有一个API返回的是CompletableFuture 类型,那么你就只能使用 thenCompose方法。

    组合Futurue结果

    如果你有两个异步操作互相没有依赖,但是第三步操作依赖前两部计算的结果,那么你可以使用 thenCombine 方法来实现,如下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿树苗完成"), (x, y) -> x + y + "植树完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 拿树苗完成植树完成

    挖坑和拿树苗可以同时进行,但是第三步植树则祖尧前两步完成后才能进行。

    可以看到符合我们的预期。使用场景之前也提到过。我们调用多个微服务的接口时,可以使用这种方式进行组合。处理接口调用间的依赖关系。 当你需要两个 Future 的结果,但是不需要再加工后向下游传递计算结果时,可以使用 thenAcceptBoth,用法一样,只不过接收的函数没有返回值。

    并行处理多个 Future

    假如我们对微服务接口的调用不止两个,并且还有一些其它可以异步执行的逻辑。主流程需要等待这些所有的异步操作都返回时,才能继续往下执行。此时我们可以使用 CompletableFuture.allOf 方法。它接收 n 个 CompletableFuture,返回一个 CompletableFuture。对其调用 get 方法后,只有所有的 CompletableFuture 全完成时才会继续后面的逻辑。我们看下面示例代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("挖坑完成");
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取树苗完成");
        });
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取肥料完成");
        });
        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("植树准备工作完成!");
    }
    // 挖坑完成
    // 取肥料完成
    // 取树苗完成
    // 植树准备工作完成!

    异常处理

    在异步计算链中的异常处理可以采用 handle 方法,它接收两个参数,第一个参数是计算及过,第二个参数是异步计算链中抛出的异常。使用方法如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("Computation error");
            }
            return "挖坑完成";
        }).handle((result, throwable) -> {
            if (result == null) {
                return "挖坑异常";
            }
            return result;
        });
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑异常

    代码中会抛出一个 RuntimeException,抛出这个异常时 result 为 null,而 throwable 不为null。根据这些信息你可以在 handle 中进行处理,如果抛出的异常种类很多,你可以判断 throwable 的类型,来选择不同的处理逻辑。

    以上是java多執行緒怎麼透過CompletableFuture組裝異步計算單元的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除