ホームページ >Java >&#&チュートリアル >JavaマルチスレッドツールCompletableFutureの使い方

JavaマルチスレッドツールCompletableFutureの使い方

WBOY
WBOY転載
2023-04-29 08:34:151964ブラウズ

    まえがき

    Future の問題

    マルチスレッド プログラムを作成する場合、Future を使用して非同期プログラムから結果を取得できます。ただし、使用中にいくつかの問題が発生します:

    • Future の結果に対してさらに操作を実行したい場合は、現在のスレッドをブロックする必要があります

    • 複数の Future をチェーン内で実行することはできません。各 Future の結果は独立しています。Future の結果に対して別の非同期処理を実行することが期待されます。

    • 例外処理戦略はありません。Future の実行が失敗した場合は、手動でキャプチャする必要があります。

    CompletableFuture が誕生しました

    Future の問題を解決するために、JDK は私たちに提供してくれました。 1.8 ツール クラス CompletableFuture の便利な機能を使用します;

    Future インターフェイスと CompletionStage インターフェイスを実装し、Future の欠点に対応する処理メソッドを提供します。

    • 非同期スレッドの実行が終了した後、ブロックすることなく新しい処理ロジックを自動的にコールバックできます。

    • 複数の非同期タスクを調整できます。または並べ替え

    • 例外処理

    CompletableFuture の核となる考え方は、各非同期タスクをステップ (CompletionStage) とみなすことができるということです。他の非同期タスクは、このステップに基づいてやりたいことを実行できます。

    CompletionStage では、非常に強力な多くのステップ処理メソッドが定義されています。参考までに、日常生活で一般的に使用されるいくつかのメソッドのみをここに示します。

    使用法

    基本的な使用法 - 非同期タスクの送信

    簡単な使用法

    非同期実行、結果は必要ありません:

    // 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool
    CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

    非同期実行と同時に結果を返します:

    // 同样可以指定线程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
    System.out.println(stringCompletableFuture.get());

    前の非同期タスクの結果を処理します

    • thenRun: 前のステップの結果は必要ありません。新しい操作

    • thenAccept: 前の非同期処理の内容を取得し、新しい操作を実行します

    • ##thenApply: 前のステップの内容を取得し、その後、新しいコンテンツを生成します

    Async サフィックスが付いているものは、新しい処理操作が依然として非同期であることを意味します。非同期操作では、処理するエグゼキュータを指定できます

    JavaマルチスレッドツールCompletableFutureの使い方

    // Demo
           CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    // 针对上一步的结果做处理,产生新的结果
                    .thenApplyAsync(s -> s.toUpperCase())
                    // 针对上一步的结果做处理,不返回结果
                    .thenAcceptAsync(s -> System.out.println(s))
                    // 不需要上一步返回的结果,直接进行操作
                    .thenRunAsync(() -> System.out.println("end"));
            ;

    2 つの結果を選択します - どちらかを受け入れます

    2 つのコールバックを処理する場合、任意の完了を行うことができます。が使用され、2 つの結果間に関係がない場合は、acceptEither を使用します。

    2 つの非同期スレッドの実行を最初に完了した人がその結果を使用します。他のタイプのメソッドにも同じことが当てはまります。

    JavaマルチスレッドツールCompletableFutureの使い方

    // 返回abc
    CompletableFuture
                    .supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "Hello CompletableFuture!";
                    })
                    .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    // 返回Hello CompletableFuture!       
    CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .acceptEither(CompletableFuture.supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "abc";
                    }), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    JavaマルチスレッドツールCompletableFutureの使い方2 つの結果を結合します。次に結合し、両方を受け入れます

    次に結合

    CompletionStages が 2 つある場合は、2 つの結果を統合して、新しい結果を計算する必要があります。

      その後、Compose は前の CompletionStage の結果を処理して結果を返します。戻り値の型は CompletionStage である必要があります。
    • その後、Combine は最初の CompletionStage の結果を取得し、次に現在の CompletionStage を取得して、2 つの結果を処理します。
    •         CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);
      
              CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                      .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                          @Override
                          public Double apply(Integer wight, Integer height) {
                              return wight * 10000.0 / (height * height);
                          }
                      })
                      ;
    thenAccept Both

    2 つの非同期 CompletableFuture の結果が必要です。両方が完了した場合にのみ、thenAccept Both コールバックに入ります。

    JavaマルチスレッドツールCompletableFutureの使い方

    #
    // thenAcceptBoth案例:
            CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                    		// 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数
                        @Override
                        public void accept(String s, String s2) {
                            System.out.println("param1=" + s + ", param2=" + s2);
                        }
                    });
    // 结果:param1=Hello CompletableFuture!, param2=abc

    例外処理JavaマルチスレッドツールCompletableFutureの使い方

    CompleteFuture を使用して複数の非同期コールバックでチェーン呼び出しを行う場合、 1 回の実行で問題が発生すると、後続のすべてのコールバックが停止するため、例外処理戦略が必要になります。

    例外的に

    #例外的には、エラーが発生したときに、返されるコンテンツを回復してカスタマイズする機会が与えられることを意味します。

            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("发生错误");
            }).exceptionally(throwable -> {
                log.error("调用错误 {}", throwable.getMessage(), throwable);
                return "异常处理内容";
            });

    handle

    Exceptionally は例外が発生した場合にのみ実行されますが、handle はエラーが発生したかどうかに関係なく実行されます。

    CompletableFuture.supplyAsync(() -> {
        return "abc";
    })
    .handle((r,err) -> {
        log.error("调用错误 {}", err.getMessage(), err);
        // 对结果做额外的处理
        return r;
    })
    ;
    ケース

    多数のユーザーがテキスト メッセージを送信|メッセージ

    要件は、テーブル内の特定の条件をテキスト メッセージでユーザーに通知することですが、その数は数百万です。のテキスト メッセージ ユーザーの数が多い場合、シングル スレッドの読み取りを使用すると、読み取り効率が非常に遅くなります。現時点では、読み取りにマルチスレッドの使用を検討できます;

    1. 読み取りタスクを複数の異なるサブタスクに分割し、オフセットと読み取り数を指定します

      // 假设有500万条记录
            long recordCount = 500 * 10000;
            int subTaskRecordCount = 10000;
            // 对记录进行分片
            List<Map> subTaskList = new LinkedList<>();
            for (int i = 0; i < recordCount / 500; i++) {
                // 如果子任务结构复杂,建议使用对象
                HashMap<String, Integer> subTask = new HashMap<>();
                subTask.put("index", i);
                subTask.put("offset", i * subTaskRecordCount);
                subTask.put("count", subTaskRecordCount);
                subTaskList.add(subTask);
            }

    2. マルチスレッドを使用します。バッチ読み取り用のスレッド

      // 进行subTask批量处理,拆分为不同的任务
            subTaskList.stream()
                    .map(subTask -> CompletableFuture.runAsync(()->{
                        // 读取数据,然后处理
                        // dataTunel.read(subTask);
                    },excuturs))   // 使用应用的通用任务线程池
                    .map(c -> ((CompletableFuture<?>) c).join());

    3. ビジネス ロジック処理を実行するか、読み取り後に直接ビジネス ロジック処理を実行します。

    并发获取商品不同信息

    在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。

    当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;

         List<Task> taskList = new ArrayList<>();
            List<Object> result = taskList.stream()
                    .map(task -> CompletableFuture.supplyAsync(()->{
    //                    handlerMap.get(task).query();
                        return "";
                    }, executorService))
                    .map(c -> c.join())
                    .collect(Collectors.toList());

    问题

    thenRun和thenRunAsync有什么区别

    • 如果不使用传入的线程池,大家用默认的线程池ForkJoinPool

    • thenRun用的默认和上一个任务使用相同的线程池

    • thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;

    handle和exceptional有什么区别

    exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

    以上がJavaマルチスレッドツールCompletableFutureの使い方の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。