ホームページ  >  記事  >  Java  >  Java8 の CompletableFuture の使い方の紹介 (例付き)

Java8 の CompletableFuture の使い方の紹介 (例付き)

不言
不言転載
2019-04-04 10:04:495661ブラウズ

この記事では、Java8 の CompletableFuture の使い方を (例とともに) 紹介します。一定の参考価値があります。困っている友人は参考にしてください。お役に立てれば幸いです。

Java 8 同時実行 API の改善として導入されたこの記事は、CompletableFuture クラスの機能と使用例を紹介します。同時に、Java 9 では CompletableFuture にもいくつかの改良が加えられています。これについては後で説明します。

将来の計算

将来の非同期計算は操作が難しく、一般的にはあらゆる計算ロジックを一連のステップとして扱う必要があります。しかし、非同期計算の場合、コールバックとして表されるメソッドはコード全体に散在しているか、相互に深くネストされていることがよくあります。ただし、ステップの 1 つで発生する可能性のあるエラーを処理する必要がある場合、事態はさらに複雑になる可能性があります。

Future インターフェースは、非同期計算として Java 5 で新しく追加されましたが、計算を結合したり、起こり得るエラーを処理したりするメソッドがありません。

Java 8 では、CompletableFuture クラスが導入されました。 Future インターフェースに加えて、CompletionStage インターフェースも実装します。このインターフェイスは、他の先物と組み合わせることができる非同期計算コントラクトを定義します。

CompletableFuture は、構成とフレームワークの両方であり、約 50 種類の異なる構成、組み合わせ、非同期計算ステップの実行、エラーの処理を備えています。

このような巨大な API は圧倒されるかもしれません。いくつかの重要な API を以下で強調表示します。

CompletableFuture を Future 実装として使用する

まず、CompletableFuture クラスは Future インターフェイスを実装するため、Future 実装として使用できますが、追加の補完実装ロジックが必要です。

たとえば、パラメーターなしのコンストラクターを使用してこのクラスのインスタンスを作成し、complete メソッドを使用してそれを完了できます。コンシューマーは get メソッドを使用して、get() の結果が得られるまで現在のスレッドをブロックできます。

以下の例には、CompletableFuture インスタンスを作成し、別のスレッドで Future をすぐに計算して返すメソッドがあります。

計算が完了すると、メソッドは結果を完全なメソッドに提供することで Future を完了します。

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture 
      = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });
 
    return completableFuture;
}

計算を分離するには、Executor API を使用します。 CompletableFuture の完了メソッドは、任意の同時実行パッケージ (元のスレッドを含む) で使用できます。

calculateAsync メソッドは Future インスタンスを返すことに注意してください。

メソッドを呼び出して

Future インスタンスを受け取り、結果をブロックする準備ができたらその get メソッドを呼び出すだけです。

また、

get メソッドは、いくつかのチェック例外、つまり ExecutionException (計算中に発生する例外をカプセル化する) および InterruptedException (例外) をスローすることにも注意してください。メソッドを実行しているスレッドが中断されていることを示します):

Future<String> completableFuture = calculateAsync();
 
// ... 
 
String result = completableFuture.get();
assertEquals("Hello", result);
計算結果がすでにわかっている場合は、結果を同期的に返すこともできます。

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");
 
// ...
 
String result = completableFuture.get();
assertEquals("Hello", result);
一部のシナリオでは、将来のタスクの実行をキャンセルしたい場合があります。

結果が見つからず、タスクの非同期実行を完全にキャンセルすることにしたとします。これは、Future のキャンセル メソッドを通じて実行できます。このメソッドは

mayInterruptIfRunning ですが、CompletableFuture の場合、CompletableFuture の処理の制御に割り込みは使用されないため、効果はありません。

これは、非同期メソッドの修正バージョンです:

public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
 
    return completableFuture;
}
Future.get() メソッドを使用して結果をブロックする場合、

cancel() は、結果をキャンセルすることを意味します。

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
API の概要

静的メソッドの説明

上記のコードは非常に単純で、タスクを使用して CompletableFuture をインスタンス化するいくつかの静的メソッドを次に示します。実例。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
runAsync メソッドは Runnable のインスタンスを受け取りますが、値を返しません


supplyAsync メソッドはパラメーターのない JDK8 関数インターフェイスであり、結果を返します


これら 2 つのメソッドは executor のアップグレードです。つまり、タスクは指定されたスレッド プールで実行されます。指定しない場合、タスクは通常、ForkJoinPool.commonPool() スレッド プールで実行されます。


supplyAsync() は、

静的メソッド

runAsync および supplyAsync を使用して、Runnable および Supplier 関数タイプから CompletableFuture インスタンスを適宜作成できるようにします。

Runnable インターフェイスはスレッドで使用される古いインターフェイスであり、戻り値を許可しません。

Supplier インターフェイスは、パラメーターを持たず、パラメーター化された型の単一のメソッドを返す一般的な関数インターフェイスです。

これにより、Supplier のインスタンスを、計算を実行して結果を返すラムダ式として提供できるようになります。

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
 
// ...
 
assertEquals("Hello", future.get());
thenRun() は、2 つのタスク taskA で

使用されます。 , タスク B で、タスク A の値が必要なく、タスク B でそれを参照したくない場合は、Runnable ラムダを

thenRun() メソッドに渡すことができます。次の例では、future.get() メソッドを呼び出した後、コンソールに 1 行を出力するだけです:

TEMPLATE

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
  • 第一行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
  • 第二行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,会返回resultA,但是 B 不需要 A 的结果。

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();

thenAccept()使用

在两个任务任务A,任务B中,如果你不需要在Future中有返回值,则可以用 thenAccept方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

第一行中,runAsync不会有返回值,第二个方法thenAccept,接收到的resultA值为null,同时任务B也不会有返回结果

第二行中,supplyAsync有返回值,同时任务B不会有返回结果。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();

thenApply()使用

在两个任务任务A,任务B中,任务B想要任务A计算的结果,可以用thenApply方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
  • 第二行用的是 thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());

当然,多个任务的情况下,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

thenCompose()使用

接下来会有一个很有趣的设计模式;

CompletableFuture API 的最佳场景是能够在一系列计算步骤中组合CompletableFuture实例。

这种组合结果本身就是CompletableFuture,允许进一步再续组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式

简单说,Monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。

在下面的示例中,我们使用thenCompose方法按顺序组合两个Futures。

请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());

该thenCompose方法连同thenApply一样实现了结果的合并计算。但是他们的内部形式是不一样的,它们与Java 8中可用的Stream和Optional类的map和flatMap方法是有着类似的设计思路在里面的。

两个方法都接收一个CompletableFuture并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个CompletableFuture对象。此功能结构允许将这些类的实例继续进行组合计算。

thenCombine()

取两个任务的结果

如果要执行两个独立的任务,并对其结果执行某些操作,可以用Future的thenCombine方法:

模板

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));
 
assertEquals("Hello World", completableFuture.get());

更简单的情况是,当你想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

thenApply()和thenCompose()之间的区别

在前面的部分中,我们展示了关于thenApply()和thenCompose()的示例。这两个API都是使用的CompletableFuture调用,但这两个API的使用是不同的。

thenApply()

此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个CompletableFuture。

因此,当我们想要转换CompletableFuture 调用的结果时,效果是这样的  :

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

thenCompose()

该thenCompose()方法类似于thenApply()在都返回一个新的计算结果。但是,thenCompose()使用前一个Future作为参数。它会直接使结果变新的Future,而不是我们在thenApply()中到的嵌套Future,而是用来连接两个CompletableFuture,是生成一个新的CompletableFuture:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果想要继续嵌套链接CompletableFuture  方法,那么最好使用thenCompose()

并行运行多个任务

当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待所有的完成任务:

API

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}

实战

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);
 
// ...
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

请注意,CompletableFuture.allOf()的返回类型是CompletableFuture 。这种方法的局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它可以在Stream.map()方法中用作方法引用。

异常处理

说到这里,我们顺便来说下 CompletableFuture 的异常处理。这里我们要介绍两个方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看下代码

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。

那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "errorResultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

errorResultA resultB resultC resultD
String name = null;
 
// ...
 
CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
 
assertEquals("Hello, Stranger!", completableFuture.get());

当然,它们也可以都为 null,因为如果它作用的那个 CompletableFuture 实例没有返回值的时候,s 就是 null。

Async后缀方法

CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。

没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。

下面附带一个案例,可以看到有thenApplyAsync方法。在程序内部,线程被包装到ForkJoinTask实例中。这样可以进一步并行化你的计算并更有效地使用系统资源。

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");
 
assertEquals("Hello World", future.get());

JDK 9 CompletableFuture API

在Java 9中,  CompletableFuture API通过以下更改得到了进一步增强:

  • 新工厂方法增加了
  • 支持延迟和超时
  • 改进了对子类化的支持。

引入了新的实例API:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

还有一些静态实用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

最后,为了解决超时问题,Java 9又引入了两个新功能:

  • orTimeout()
  • completeOnTimeout()

结论

在本文中,我们描述了CompletableFuture类的方法和典型用例。

【相关推荐:Java视频教程

以上がJava8 の CompletableFuture の使い方の紹介 (例付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はsegmentfault.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。