首頁 >Java >java教程 >Java多執行緒工具CompletableFuture怎麼使用

Java多執行緒工具CompletableFuture怎麼使用

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB轉載
2023-04-29 08:34:151999瀏覽

    前言

    Future的問題

    寫多執行緒程式的時候,可以使用Future從一個非同步執行緒拿到結果,但如果使用過程中會發現一些問題:

    • 如果想要對Future的結果做進一步的操作,需要阻塞目前執行緒

    • 多個Future不能被鍊式的執行,每個Future的結果都是獨立的,期望對一個Future的結果做另外一件非同步的事情;

    • 沒有異常處理策略,如果Future執行失敗了,需要手動捕捉

    CompletableFuture應運而生

    為了解決Future問題,JDK在1.8的時候給了我們一個好用的工具類CompletableFuture;

    它實作了Future和CompletionStage接口,針對Future的不足之處給出了相應的處理方式。

    • 在非同步執行緒執行結束後可以自動回呼我們新的處理邏輯,無需阻塞

    • 可以對多個非同步任務進行編排,組合或排序

    • 異常處理

    CompletableFuture的核心思想是將每個非同步任務都可以看做一個步驟(CompletionStage),然後其他的非同步任務可以根據這個步驟做一些想做的事情。

    CompletionStage定義了許多步驟處理的方法,功能非常強大,這裡就只列一下日常中常用到的一些方法供大家參考。

    使用方式

    基本使用-提交非同步任務

    簡單的使用方式

    非同步執行,無需結果:

    // 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool
    CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

    非同步執行,同時傳回結果:

    // 同样可以指定线程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
    System.out.println(stringCompletableFuture.get());

    處理上個非同步任務結果

    • thenRun: 不需要上一個步驟的結果,直接直接新的動作

    Java多執行緒工具CompletableFuture怎麼使用

    #thenAccept:取得上一步非同步處理的內容,進行新的動作

    thenApply: 取得上一步的內容,然後產生新的內容

    Java多執行緒工具CompletableFuture怎麼使用

    #所有加上Async後綴的,代表新的處理操作仍然是非同步的。 Async的運算都可以指定Executors進行處理Java多執行緒工具CompletableFuture怎麼使用

    // Demo
           CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    // 针对上一步的结果做处理,产生新的结果
                    .thenApplyAsync(s -> s.toUpperCase())
                    // 针对上一步的结果做处理,不返回结果
                    .thenAcceptAsync(s -> System.out.println(s))
                    // 不需要上一步返回的结果,直接进行操作
                    .thenRunAsync(() -> System.out.println("end"));
            ;
    對兩個結果進行選用-acceptEither

    當我們有兩個回呼在處理的時候,任何完成都可以使用,兩者結果沒有關係,那就使用acceptEither。

      兩個非同步執行緒誰先執行完成,用誰的結果
    • ,其餘類型的方法也是如此。

    • // 返回abc
      CompletableFuture
                      .supplyAsync(() -> {
                          SleepUtils.sleep(100);
                          return "Hello CompletableFuture!";
                      })
                      .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                          @Override
                          public void accept(String s) {
                              System.out.println(s);
                          }
                      });
      // 返回Hello CompletableFuture!       
      CompletableFuture
                      .supplyAsync(() -> "Hello CompletableFuture!")
                      .acceptEither(CompletableFuture.supplyAsync(() -> {
                          SleepUtils.sleep(100);
                          return "abc";
                      }), new Consumer<String>() {
                          @Override
                          public void accept(String s) {
                              System.out.println(s);
                          }
                      });
    合併兩個結果-thenCombine, thenAcceptBoth

    thenCombine

    當我們有兩個CompletionStage時,需要將兩個的結果整合處理,然後計算出一個新的結果。

    Java多執行緒工具CompletableFuture怎麼使用

    thenCompose是對上一個CompletionStage的結果進行處理,傳回結果,並且傳回類型必須是CompletionStage。 Java多執行緒工具CompletableFuture怎麼使用

    thenCombine是得到第一個CompletionStage的結果,然後拿到目前的CompletionStage,兩者的結果會處理。

            CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);
    
            CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                    .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                        @Override
                        public Double apply(Integer wight, Integer height) {
                            return wight * 10000.0 / (height * height);
                        }
                    })
                    ;
    thenAcceptBoth

    需要兩個非同步CompletableFuture的結果,兩者都完成的時候,才進入thenAcceptBoth回呼。

    // thenAcceptBoth案例:
            CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                    		// 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数
                        @Override
                        public void accept(String s, String s2) {
                            System.out.println("param1=" + s + ", param2=" + s2);
                        }
                    });
    // 结果:param1=Hello CompletableFuture!, param2=abc

    異常處理

    當我們使用CompleteFuture進行鍊式呼叫的時候,多個非同步回呼中,如果有一個執行出現問題,那麼接下來的回呼都會停止,所以需要一個異常處理策略。

    exceptionally

    exceptionally是當出現錯誤時,給我們機會進行恢復,自訂回傳內容。

            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("发生错误");
            }).exceptionally(throwable -> {
                log.error("调用错误 {}", throwable.getMessage(), throwable);
                return "异常处理内容";
            });

    ###handle#########exceptionally是只有發生異常時才會執行,而handle則是不管是否發生錯誤都會執行。 ###
    CompletableFuture.supplyAsync(() -> {
        return "abc";
    })
    .handle((r,err) -> {
        log.error("调用错误 {}", err.getMessage(), err);
        // 对结果做额外的处理
        return r;
    })
    ;
    ###案例######大量用戶發送簡訊|訊息######需求為對某個表中特定條件的用戶進行簡訊通知,但是短信用戶有成百上千萬,如果使用單執行緒讀取效率會很慢。這時候可以考慮使用多執行緒的方式進行讀取;######1、將讀取任務拆分為多個不同的子任務,指定讀取的偏移量和個數###
      // 假设有500万条记录
            long recordCount = 500 * 10000;
            int subTaskRecordCount = 10000;
            // 对记录进行分片
            List<Map> subTaskList = new LinkedList<>();
            for (int i = 0; i < recordCount / 500; i++) {
                // 如果子任务结构复杂,建议使用对象
                HashMap<String, Integer> subTask = new HashMap<>();
                subTask.put("index", i);
                subTask.put("offset", i * subTaskRecordCount);
                subTask.put("count", subTaskRecordCount);
                subTaskList.add(subTask);
            }
    # ##2、使用多執行緒進行批次讀取###
      // 进行subTask批量处理,拆分为不同的任务
            subTaskList.stream()
                    .map(subTask -> CompletableFuture.runAsync(()->{
                        // 读取数据,然后处理
                        // dataTunel.read(subTask);
                    },excuturs))   // 使用应用的通用任务线程池
                    .map(c -> ((CompletableFuture<?>) c).join());
    ###3、進行業務邏輯處理,或直接在讀取完進行業務邏輯處理也是可以;###

    并发获取商品不同信息

    在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。

    当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;

         List<Task> taskList = new ArrayList<>();
            List<Object> result = taskList.stream()
                    .map(task -> CompletableFuture.supplyAsync(()->{
    //                    handlerMap.get(task).query();
                        return "";
                    }, executorService))
                    .map(c -> c.join())
                    .collect(Collectors.toList());

    问题

    thenRun和thenRunAsync有什么区别

    • 如果不使用传入的线程池,大家用默认的线程池ForkJoinPool

    • thenRun用的默认和上一个任务使用相同的线程池

    • thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;

    handle和exceptional有什么区别

    exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

    以上是Java多執行緒工具CompletableFuture怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除