>Java >java지도 시간 >Java8의 CompletableFuture 사용법 소개(예제 포함)

Java8의 CompletableFuture 사용법 소개(예제 포함)

不言
不言앞으로
2019-04-04 10:04:495737검색

이 기사에서는 Java 8의 CompletableFuture 사용법을 소개합니다(예제 포함). 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.

Java 8 동시성 API 개선으로 도입된 이 문서는 CompletableFuture 클래스의 기능과 사용 사례를 소개합니다. 동시에 Java 9의 CompletableFuture에는 몇 가지 개선 사항이 있으며 이에 대해서는 나중에 설명하겠습니다.

미래 계산

미래 비동기 계산은 작동하기 어렵고 일반적으로 모든 계산 논리를 일련의 단계로 처리하려고 합니다. 그러나 비동기 계산의 경우 콜백으로 표시되는 메서드는 코드 전체에 흩어져 있거나 서로 깊게 중첩되는 경우가 많습니다. 그러나 단계 중 하나에서 발생할 수 있는 오류를 처리해야 하면 상황이 더 복잡해질 수 있습니다.

Futrue 인터페이스는 비동기 계산으로 Java 5의 새로운 기능이지만 계산을 결합하거나 발생 가능한 오류를 처리하는 방법이 없습니다.

Java 8에서는 CompletableFuture 클래스가 도입되었습니다. Future 인터페이스와 함께 CompletionStage 인터페이스도 구현합니다. 이 인터페이스는 다른 Future와 결합할 수 있는 비동기 계산 계약을 정의합니다.

CompletableFuture는 약 50가지의 다양한 구성, 조합, 비동기 계산 단계 실행 및 오류 처리를 포함하는 구성이자 프레임워크입니다.

이렇게 거대한 API는 부담스러울 수 있습니다. 몇 가지 중요한 API는 아래에 강조되어 있습니다.

CompletableFuture를 Future 구현으로 사용

먼저 CompletableFuture 클래스는 Future 인터페이스를 구현하므로 Future 구현으로 사용할 수 있지만 추가 완성 구현 로직이 필요합니다.

예를 들어 매개변수 없는 생성자를 사용하여 이 클래스의 인스턴스를 만든 다음 complete 메서드를 사용하여 완료할 수 있습니다. 소비자는 get 메서드를 사용하여 get() 결과가 나올 때까지 현재 스레드를 차단할 수 있습니다. complete方法完成。消费者可以使用get方法来阻塞当前线程,直到get()结果。

在下面的示例中,我们有一个创建CompletableFuture实例的方法,然后在另一个线程中计算并立即返回Future。

计算完成后,该方法通过将结果提供给完整方法来完成Future:

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture 
      = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });
 
    return completableFuture;
}

为了分离计算,我们使用了Executor API ,这种创建和完成CompletableFuture的方法可以与任何并发包(包括原始线程)一起使用。

请注意,calculateAsync方法返回一个Future实例。

我们只是调用方法,接收Future实例并在我们准备阻塞结果时调用它的get方法。

另请注意,get方法抛出一些已检查的异常,即ExecutionException(封装计算期间发生的异常)和InterruptedException(表示执行方法的线程被中断的异常):

Future<String> completableFuture = calculateAsync();
 
// ... 
 
String result = completableFuture.get();
assertEquals("Hello", result);

如果你已经知道计算的结果,也可以用变成同步的方式来返回结果。

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");
 
// ...
 
String result = completableFuture.get();
assertEquals("Hello", result);

作为在某些场景中,你可能希望取消Future任务的执行。

假设我们没有找到结果并决定完全取消异步执行任务。这可以通过Future的取消方法完成。此方法mayInterruptIfRunning,但在CompletableFuture的情况下,它没有任何效果,因为中断不用于控制CompletableFuture的处理。

这是异步方法的修改版本:

public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
 
    return completableFuture;
}

当我们使用Future.get()方法阻塞结果时,cancel()表示取消执行,它将抛出CancellationException:

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

API介绍

static方法说明

上面的代码很简单,下面介绍几个 static 方法,它们使用任务来实例化一个 CompletableFuture 实例。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

runAsync 方法接收的是 Runnable 的实例,但是它没有返回值

supplyAsync 方法是JDK8函数式接口,无参数,会返回一个结果

这两个方法是 executor 的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的。

supplyAsync()使用

静态方法runAsyncsupplyAsync允许我们相应地从Runnable和Supplier功能类型中创建CompletableFuture实例。

该Runnable的接口是在线程使用旧的接口,它不允许返回值。

Supplier接口是一个不具有参数,并返回参数化类型的一个值的单个方法的通用功能接口。

这允许将Supplier的实例作为lambda表达式提供,该表达式执行计算并返回结果:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
 
// ...
 
assertEquals("Hello", future.get());

thenRun()使用

在两个任务任务A,任务B中,如果既不需要任务A的值也不想在任务B中引用,那么你可以将Runnable lambda 传递给thenRun()

아래 예에는 CompletableFuture 인스턴스를 생성한 다음 다른 스레드에서 즉시 Future를 계산하고 반환하는 메서드가 있습니다.

계산이 완료된 후 메소드는 결과를 완전한 메소드에 제공하여 Future를 완성합니다. 🎜
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
🎜계산을 분리하기 위해 Executor API를 사용하여 Executor API를 생성하고 완료합니다. >CompletableFuture 모든 동시성 패키지(원시 스레드 포함)와 함께 사용할 수 있습니다. 🎜🎜calculateAsync 메서드는 Future 인스턴스를 반환한다는 점에 유의하세요. 🎜🎜메서드를 호출하고 Future 인스턴스를 수신하고 결과를 차단할 준비가 되면 get 메서드를 호출하기만 하면 됩니다. 🎜🎜또한 get 메소드는 일부 확인된 예외, 즉 ExecutionException(계산 중에 발생하는 예외를 캡슐화함) 및 InterruptedException(예외 표시 메서드를 실행하는 스레드가 중단되었음을 나타냅니다): 🎜
CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();
🎜계산 결과를 이미 알고 있는 경우 결과를 동기적으로 반환할 수도 있습니다. 🎜
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
🎜일부 시나리오에서와 마찬가지로 향후 작업 실행을 취소하고 싶을 수도 있습니다. 🎜🎜결과가 없고 작업의 비동기 실행을 완전히 취소하기로 결정했다고 가정해 보겠습니다. 이는 Future의 취소 메소드를 통해 수행할 수 있습니다. 이 메소드는 mayInterruptIfRunning이지만 CompletableFuture의 경우 인터럽트가 CompletableFuture의 처리를 제어하는 ​​데 사용되지 않기 때문에 아무런 효과가 없습니다. 🎜🎜이것은 비동기 메소드의 수정된 버전입니다: 🎜
CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();
🎜 Future.get() 메소드를 사용하여 결과를 차단할 때 cancel()은 실행 취소를 의미하며 CancellationException이 발생합니다. : 🎜
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
🎜API 🎜🎜정적 메서드 설명🎜🎜 소개 위의 코드는 매우 간단합니다. 다음은 작업을 사용하여 CompletableFuture 인스턴스를 인스턴스화하는 몇 가지 정적 메서드입니다. 🎜
CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());
🎜runAsync 메서드는 Runnable의 인스턴스를 수신하지만 반환 값이 없습니다.
🎜🎜supplyAsync 메서드는 매개 변수가 없는 JDK8 기능 인터페이스이며 결과를 반환합니다.
🎜🎜이 두 메서드는 실행자입니다. 업그레이드는 지정된 스레드 풀에서 작업이 실행됨을 의미합니다. 지정하지 않은 경우 일반적으로 작업은 ForkJoinPool.commonPool() 스레드 풀에서 실행됩니다.
🎜🎜supplyAsync()는 🎜🎜정적 메서드인 runAsyncsupplyAsync를 사용하여 그에 따라 Runnable 및 Supply 함수 유형에서 CompletableFuture 인스턴스를 생성할 수 있도록 해줍니다. 🎜🎜Runnable 인터페이스는 스레드에서 사용되는 오래된 인터페이스이며 반환 값을 허용하지 않습니다. 🎜🎜Supplier 인터페이스는 매개변수를 사용하지 않고 매개변수화된 유형의 단일 메서드를 반환하는 일반 기능 인터페이스입니다. 🎜🎜이렇게 하면 계산을 수행하고 결과를 반환하는 람다 식으로 공급자 인스턴스를 제공할 수 있습니다. 🎜
CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());
🎜thenRun()이 🎜🎜 두 작업 taskA, taskB에서 사용됩니다. 작업 A가 둘 다 아닌 경우 값도 원하지 않습니다. 작업 B에서 참조되도록 하려면 Runnable 람다를 thenRun() 메서드에 전달할 수 있습니다. 아래 예에서는 future.get() 메서드를 호출한 후 콘솔에 🎜🎜Template🎜 한 줄만 인쇄합니다.
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
  • 第一行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
  • 第二行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,会返回resultA,但是 B 不需要 A 的结果。

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();

thenAccept()使用

在两个任务任务A,任务B中,如果你不需要在Future中有返回值,则可以用 thenAccept方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

第一行中,runAsync不会有返回值,第二个方法thenAccept,接收到的resultA值为null,同时任务B也不会有返回结果

第二行中,supplyAsync有返回值,同时任务B不会有返回结果。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();

thenApply()使用

在两个任务任务A,任务B中,任务B想要任务A计算的结果,可以用thenApply方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
  • 第二行用的是 thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。

实战

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());

当然,多个任务的情况下,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

thenCompose()使用

接下来会有一个很有趣的设计模式;

CompletableFuture API 的最佳场景是能够在一系列计算步骤中组合CompletableFuture实例。

这种组合结果本身就是CompletableFuture,允许进一步再续组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式

简单说,Monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。

在下面的示例中,我们使用thenCompose方法按顺序组合两个Futures。

请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());

该thenCompose方法连同thenApply一样实现了结果的合并计算。但是他们的内部形式是不一样的,它们与Java 8中可用的Stream和Optional类的map和flatMap方法是有着类似的设计思路在里面的。

两个方法都接收一个CompletableFuture并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个CompletableFuture对象。此功能结构允许将这些类的实例继续进行组合计算。

thenCombine()

取两个任务的结果

如果要执行两个独立的任务,并对其结果执行某些操作,可以用Future的thenCombine方法:

模板

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

实战

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));
 
assertEquals("Hello World", completableFuture.get());

更简单的情况是,当你想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

thenApply()和thenCompose()之间的区别

在前面的部分中,我们展示了关于thenApply()和thenCompose()的示例。这两个API都是使用的CompletableFuture调用,但这两个API的使用是不同的。

thenApply()

此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个CompletableFuture。

因此,当我们想要转换CompletableFuture 调用的结果时,效果是这样的  :

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

thenCompose()

该thenCompose()方法类似于thenApply()在都返回一个新的计算结果。但是,thenCompose()使用前一个Future作为参数。它会直接使结果变新的Future,而不是我们在thenApply()中到的嵌套Future,而是用来连接两个CompletableFuture,是生成一个新的CompletableFuture:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果想要继续嵌套链接CompletableFuture  方法,那么最好使用thenCompose()

并行运行多个任务

当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待所有的完成任务:

API

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}

实战

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);
 
// ...
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

请注意,CompletableFuture.allOf()的返回类型是CompletableFuture 。这种方法的局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它可以在Stream.map()方法中用作方法引用。

异常处理

说到这里,我们顺便来说下 CompletableFuture 的异常处理。这里我们要介绍两个方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看下代码

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。

那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "errorResultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

errorResultA resultB resultC resultD
String name = null;
 
// ...
 
CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
 
assertEquals("Hello, Stranger!", completableFuture.get());

当然,它们也可以都为 null,因为如果它作用的那个 CompletableFuture 实例没有返回值的时候,s 就是 null。

Async后缀方法

CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。

没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。

下面附带一个案例,可以看到有thenApplyAsync方法。在程序内部,线程被包装到ForkJoinTask实例中。这样可以进一步并行化你的计算并更有效地使用系统资源。

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");
 
assertEquals("Hello World", future.get());

JDK 9 CompletableFuture API

在Java 9中,  CompletableFuture API通过以下更改得到了进一步增强:

  • 新工厂方法增加了
  • 支持延迟和超时
  • 改进了对子类化的支持。

引入了新的实例API:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

还有一些静态实用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

最后,为了解决超时问题,Java 9又引入了两个新功能:

  • orTimeout()
  • completeOnTimeout()

结论

在本文中,我们描述了CompletableFuture类的方法和典型用例。

【相关推荐:Java视频教程

위 내용은 Java8의 CompletableFuture 사용법 소개(예제 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제