Heim  >  Artikel  >  Java  >  Einführung in die Verwendung von Java8s CompletableFuture (mit Beispielen)

Einführung in die Verwendung von Java8s CompletableFuture (mit Beispielen)

不言
不言nach vorne
2019-04-04 10:04:495672Durchsuche

Dieser Artikel bietet Ihnen eine Einführung in die Verwendung von CompletableFuture in Java 8 (mit Beispielen). Ich hoffe, dass er für Freunde hilfreich ist.

Dieser Artikel wurde als Java 8 Concurrency API-Verbesserung eingeführt und ist eine Einführung in die Funktionalität und Anwendungsfälle der CompletableFuture-Klasse. Gleichzeitig gibt es einige Verbesserungen an CompletableFuture in Java 9, die später erläutert werden.

Zukünftige Berechnung

Zukünftige asynchrone Berechnungen sind schwierig zu bedienen, und im Allgemeinen möchten wir jede Berechnungslogik als eine Reihe von Schritten behandeln. Bei asynchronen Berechnungen sind die als Rückrufe dargestellten Methoden jedoch häufig über den gesamten Code verstreut oder tief ineinander verschachtelt. Aber die Dinge können noch komplizierter werden, wenn wir Fehler behandeln müssen, die in einem der Schritte auftreten können.

Die Future-Schnittstelle ist in Java 5 als asynchrone Berechnung neu, verfügt jedoch über keine Methode zum Kombinieren von Berechnungen oder zum Behandeln möglicher Fehler.

In Java 8 wurde die CompletableFuture-Klasse eingeführt. Neben der Future-Schnittstelle implementiert es auch die CompletionStage-Schnittstelle. Diese Schnittstelle definiert einen asynchronen Berechnungsvertrag, der mit anderen Futures kombiniert werden kann.

CompletableFuture ist sowohl eine Komposition als auch ein Framework mit etwa 50 verschiedenen Kompositionen, Kombinationen, der Ausführung asynchroner Berechnungsschritte und der Fehlerbehandlung.

So eine riesige API kann überwältigend sein, einige wichtige davon werden im Folgenden hervorgehoben.

Verwenden von CompletableFuture als Future-Implementierung

Zunächst implementiert die CompletableFuture-Klasse die Future-Schnittstelle, sodass Sie sie als Future-Implementierung verwenden können, es ist jedoch zusätzliche Vervollständigungsimplementierungslogik erforderlich.

Sie können beispielsweise den parameterlosen Konstruktor verwenden, um eine Instanz dieser Klasse zu erstellen, und diese dann mit der Methode complete vervollständigen. Verbraucher können die get-Methode verwenden, um den aktuellen Thread zu blockieren, bis get() Ergebnisse liefert.

Im folgenden Beispiel haben wir eine Methode, die eine CompletableFuture-Instanz erstellt, dann die Zukunft berechnet und sofort in einem anderen Thread zurückgibt.

Nachdem die Berechnung abgeschlossen ist, vervollständigt die Methode die Zukunft, indem sie das Ergebnis der vollständigen Methode zuführt:

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture 
      = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });
 
    return completableFuture;
}

Um die Berechnung zu trennen, verwenden wir die Executor API. die erstellt und VervollständigungsMethoden von CompletableFuture können mit jedem gleichzeitigen Paket verwendet werden, einschließlich des Original-Threads.

Bitte beachten Sie, dass die calculateAsync-Methode eine Future-Instanz zurückgibt.

Wir rufen einfach die Methode auf, empfangen die Future-Instanz und rufen ihre get-Methode auf, wenn wir bereit sind, das Ergebnis zu blockieren.

Beachten Sie außerdem, dass die Methode get einige geprüfte Ausnahmen auslöst, nämlich ExecutionException (die Ausnahmen kapselt, die während der Berechnung auftreten) und InterruptedException (Ausnahme zeigt an, dass der Thread, der die Methode ausführt, unterbrochen ist):

Future<String> completableFuture = calculateAsync();
 
// ... 
 
String result = completableFuture.get();
assertEquals("Hello", result);

Wenn Sie das Ergebnis der Berechnung bereits kennen, können Sie das Ergebnis auch synchron zurückgeben.

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");
 
// ...
 
String result = completableFuture.get();
assertEquals("Hello", result);

Wie in einigen Szenarien möchten Sie möglicherweise die Ausführung zukünftiger Aufgaben abbrechen.

Angenommen, wir finden kein Ergebnis und beschließen, die asynchrone Ausführung der Aufgabe vollständig abzubrechen. Dies kann über die Stornierungsmethode des Future erfolgen. Diese MethodemayInterruptIfRunning, aber im Fall von CompletableFuture hat sie keine Auswirkung, da Interrupts nicht zur Steuerung der Verarbeitung von CompletableFuture verwendet werden.

Dies ist die modifizierte Version der asynchronen Methode:

public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
 
    return completableFuture;
}

Wenn wir die Future.get()-Methode verwenden, um das Ergebnis zu blockieren, bedeutet cancel(), die Ausführung abzubrechen, es wird eine CancellationException ausgelöst :

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

API-Einführung

Beschreibung der statischen Methode

Der obige Code ist sehr einfach. Hier sind mehrere statische Methoden, die Aufgaben verwenden, um eine CompletableFuture-Instanz zu instanziieren.

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

Die runAsync-Methode empfängt eine Instanz von Runnable, hat aber keinen Rückgabewert

Die SupplyAsync-Methode ist eine JDK8-Funktionsschnittstelle ohne Parameter und gibt ein Ergebnis zurück

Diese beiden Methoden sind Upgrades des Executors, was bedeutet, dass die Aufgabe im angegebenen Thread-Pool ausgeführt wird. Wenn nicht angegeben, wird die Aufgabe normalerweise im Thread-Pool ForkJoinPool.commonPool() ausgeführt.

supplyAsync() verwendet die

statischen Methoden runAsync und supplyAsync, sodass wir entsprechend CompletableFuture-Instanzen aus den Funktionstypen Runnable und Supplier erstellen können.

Die Runnable-Schnittstelle ist eine alte Schnittstelle, die in Threads verwendet wird und keine Rückgabewerte zulässt.

Die Lieferantenschnittstelle ist eine generische Funktionsschnittstelle, die keine Parameter akzeptiert und eine einzelne Methode eines parametrisierten Typs zurückgibt.

Dadurch kann eine Instanz von Supplier als Lambda-Ausdruck bereitgestellt werden, der eine Berechnung durchführt und das Ergebnis zurückgibt:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
 
// ...
 
assertEquals("Hello", future.get());

thenRun() Verwenden Sie

in beiden Aufgaben taskA , Wenn Sie in Aufgabe B weder den Wert von Aufgabe A benötigen noch in Aufgabe B darauf verweisen möchten, können Sie das Runnable-Lambda an die Methode thenRun() übergeben. Im folgenden Beispiel geben wir nach dem Aufruf der Methode „future.get()“ einfach eine Zeile in der Konsole aus:

Template

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
  • 第一行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
  • 第二行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,会返回resultA,但是 B 不需要 A 的结果。

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();

thenAccept()使用

在两个任务任务A,任务B中,如果你不需要在Future中有返回值,则可以用 thenAccept方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

第一行中,runAsync不会有返回值,第二个方法thenAccept,接收到的resultA值为null,同时任务B也不会有返回结果

第二行中,supplyAsync有返回值,同时任务B不会有返回结果。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();

thenApply()使用

在两个任务任务A,任务B中,任务B想要任务A计算的结果,可以用thenApply方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
  • 第二行用的是 thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());

当然,多个任务的情况下,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

thenCompose()使用

接下来会有一个很有趣的设计模式;

CompletableFuture API 的最佳场景是能够在一系列计算步骤中组合CompletableFuture实例。

这种组合结果本身就是CompletableFuture,允许进一步再续组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式

简单说,Monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。

在下面的示例中,我们使用thenCompose方法按顺序组合两个Futures。

请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());

该thenCompose方法连同thenApply一样实现了结果的合并计算。但是他们的内部形式是不一样的,它们与Java 8中可用的Stream和Optional类的map和flatMap方法是有着类似的设计思路在里面的。

两个方法都接收一个CompletableFuture并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个CompletableFuture对象。此功能结构允许将这些类的实例继续进行组合计算。

thenCombine()

取两个任务的结果

如果要执行两个独立的任务,并对其结果执行某些操作,可以用Future的thenCombine方法:

模板

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));
 
assertEquals("Hello World", completableFuture.get());

更简单的情况是,当你想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

thenApply()和thenCompose()之间的区别

在前面的部分中,我们展示了关于thenApply()和thenCompose()的示例。这两个API都是使用的CompletableFuture调用,但这两个API的使用是不同的。

thenApply()

此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个CompletableFuture。

因此,当我们想要转换CompletableFuture 调用的结果时,效果是这样的  :

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

thenCompose()

该thenCompose()方法类似于thenApply()在都返回一个新的计算结果。但是,thenCompose()使用前一个Future作为参数。它会直接使结果变新的Future,而不是我们在thenApply()中到的嵌套Future,而是用来连接两个CompletableFuture,是生成一个新的CompletableFuture:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果想要继续嵌套链接CompletableFuture  方法,那么最好使用thenCompose()

并行运行多个任务

当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待所有的完成任务:

API

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}

实战

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);
 
// ...
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

请注意,CompletableFuture.allOf()的返回类型是CompletableFuture 。这种方法的局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它可以在Stream.map()方法中用作方法引用。

异常处理

说到这里,我们顺便来说下 CompletableFuture 的异常处理。这里我们要介绍两个方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看下代码

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。

那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "errorResultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

errorResultA resultB resultC resultD
String name = null;
 
// ...
 
CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
 
assertEquals("Hello, Stranger!", completableFuture.get());

当然,它们也可以都为 null,因为如果它作用的那个 CompletableFuture 实例没有返回值的时候,s 就是 null。

Async后缀方法

CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。

没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。

下面附带一个案例,可以看到有thenApplyAsync方法。在程序内部,线程被包装到ForkJoinTask实例中。这样可以进一步并行化你的计算并更有效地使用系统资源。

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");
 
assertEquals("Hello World", future.get());

JDK 9 CompletableFuture API

在Java 9中,  CompletableFuture API通过以下更改得到了进一步增强:

  • 新工厂方法增加了
  • 支持延迟和超时
  • 改进了对子类化的支持。

引入了新的实例API:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

还有一些静态实用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

最后,为了解决超时问题,Java 9又引入了两个新功能:

  • orTimeout()
  • completeOnTimeout()

结论

在本文中,我们描述了CompletableFuture类的方法和典型用例。

【相关推荐:Java视频教程

Das obige ist der detaillierte Inhalt vonEinführung in die Verwendung von Java8s CompletableFuture (mit Beispielen). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen