PHP速学视频免费教程(入门到精通)
PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!
在现代java应用开发中,completablefuture是处理异步操作的强大工具。然而,当面临一系列需要严格顺序执行的异步任务,并且需要将每个任务的结果收集起来时,开发者可能会遇到挑战。尤其当每个异步任务本身就返回一个completionstage时,如何正确地链式调用并避免不必要的线程阻塞或并发问题,是实现高效异步流程的关键。
假设我们有一个耗时业务处理函数 process(int a),它返回一个 CompletionStage
import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; public class CompletableFutureSequential { private CompletionStage<Integer> process(int a) { return CompletableFuture.supplyAsync(() -> { System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a); // 模拟长时间运行的业务过程 try { Thread.sleep(10); // 增加延迟以观察顺序性 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return a + 10; }).whenCompleteAsync((e, t) -> { if (t != null) System.err.printf("!!! error processing '%d' !!!\n", a); System.err.printf("%s finish %d\n", LocalDateTime.now(), e); }); } // ... (后续解决方案代码将放在这里) }
我们的目标是按顺序执行一系列 process 调用,并将它们的整数结果收集到一个 List
常见误区与问题:
在 thenApplyAsync 内部使用 join():
// 第一次尝试(成功但效率低下) List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage<List<Integer>> resultStage1 = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { resultStage1 = resultStage1.thenApplyAsync((ret) -> { // 在异步回调中阻塞等待另一个CompletableFuture完成 Integer a = process(element).toCompletableFuture().join(); ret.add(a); return ret; }); } List<Integer> computeResult1 = resultStage1.toCompletableFuture().join(); // 这种方法虽然能实现顺序执行,但 `join()` 的使用意味着在 `thenApplyAsync` 的执行线程中会发生阻塞, // 导致一个阶段的执行可能占用两个线程资源(一个用于 `thenApplyAsync`,另一个用于 `process` 内部的 `supplyAsync`, // 且 `thenApplyAsync` 的线程会等待 `process` 完成),效率不高且不符合异步编程的最佳实践。
这种方式虽然实现了顺序性,但 join() 是一个阻塞操作。在 thenApplyAsync 的回调中调用 join() 会导致该回调所在的线程被阻塞,直到 process(element) 完成。这违背了异步编程的非阻塞原则,并且可能导致线程池资源被低效利用。
使用 thenCombineAsync 进行链式调用:
// 第二次尝试(失败,因为是并行执行) List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage<List<Integer>> resultStage2 = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { // thenCombineAsync 会尝试并行执行两个CompletionStage resultStage2 = resultStage2.thenCombineAsync(process(element), (array, ret) -> { array.add(ret); return array; }); } // resultStage2.toCompletableFuture().join(); // 这种方法会导致 `process(element)` 几乎同时被调度执行, // 因为 `thenCombineAsync` 的设计目的是在两个 CompletionStage 都完成后,将它们的结果合并。 // 这与我们要求的“顺序执行”相悖。
thenCombineAsync 的作用是等待两个独立的 CompletionStage 都完成后,再将它们的结果合并。这意味着 process(element) 会在循环迭代时被立即触发,而不是等待前一个 process 完成。因此,它无法保证任务的顺序执行。
thenCompose 是 CompletionStage 中用于顺序执行异步操作的关键方法。它接收一个函数,该函数会返回一个新的 CompletionStage。当当前的 CompletionStage 完成后,thenCompose 会使用其结果来触发并等待这个新的 CompletionStage 完成,从而有效地“扁平化”了嵌套的 CompletionStage。
这种方法通过一个外部的 List 来收集结果。我们初始化一个表示“前一个阶段已完成”的 CompletionStage
public class CompletableFutureSequential { // ... (process 方法同上) public static void main(String[] args) { CompletableFutureSequential app = new CompletableFutureSequential(); List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); System.out.println("--- 方案一:使用外部列表累积结果 ---"); CompletionStage<Void> loopStage = CompletableFuture.completedFuture(null); final List<Integer> resultList = new ArrayList<>(); // 外部列表 for (Integer element : arr) { loopStage = loopStage // 当 loopStage 完成后,执行 process(element) .thenCompose(v -> app.process(element)) // 当 process(element) 完成后,将其结果添加到 resultList .thenAccept(resultList::add); } // 阻塞等待所有任务完成 loopStage.toCompletableFuture().join(); System.out.println("方案一结果: " + resultList); // 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19] } }
原理分析:
这种方法更加函数式,它将结果列表作为 CompletionStage 的结果在链中传递和更新。
public class CompletableFutureSequential { // ... (process 方法同上) public static void main(String[] args) { // ... (方案一代码,省略以聚焦方案二) System.out.println("\n--- 方案二:在 CompletionStage 链中传递列表 ---"); List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage<List<Integer>> listStage = CompletableFuture.completedFuture(new ArrayList<>()); // 初始列表作为结果 for (Integer element : arr) { listStage = listStage // 当 listStage (包含当前列表) 完成后,执行 process(element) .thenCompose(list -> app.process(element) // 当 process(element) 完成后,将结果添加到传入的 list .thenAccept(list::add) // 关键:将更新后的 list 作为下一个 CompletionStage 的结果返回 .thenApply(v -> list) ); } List<Integer> resultList2 = listStage.toCompletableFuture().join(); System.out.println("方案二结果: " + resultList2); // 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19] } }
原理分析:
线程管理:
错误处理:
阻塞操作:join() 与 get():
选择方案:
通过理解和应用 thenCompose,开发者可以有效地构建复杂、顺序执行的异步任务流,同时保持代码的清晰性和响应性。
Java免费学习笔记:立即学习
解锁 Java 大师之旅:从入门到精通的终极指南
已抢7566个
抢已抢97306个
抢已抢15251个
抢已抢53918个
抢已抢198225个
抢已抢88302个
抢