マルチスレッド プログラムを作成する場合、Future を使用して非同期プログラムから結果を取得できます。ただし、使用中にいくつかの問題が発生します:
Future の結果に対してさらに操作を実行したい場合は、現在のスレッドをブロックする必要があります
複数の Future をチェーン内で実行することはできません。各 Future の結果は独立しています。Future の結果に対して別の非同期処理を実行することが期待されます。
例外処理戦略はありません。Future の実行が失敗した場合は、手動でキャプチャする必要があります。
Future の問題を解決するために、JDK は私たちに提供してくれました。 1.8 ツール クラス CompletableFuture の便利な機能を使用します;
Future インターフェイスと CompletionStage インターフェイスを実装し、Future の欠点に対応する処理メソッドを提供します。
非同期スレッドの実行が終了した後、ブロックすることなく新しい処理ロジックを自動的にコールバックできます。
複数の非同期タスクを調整できます。または並べ替え
例外処理
CompletableFuture の核となる考え方は、各非同期タスクをステップ (CompletionStage) とみなすことができるということです。他の非同期タスクは、このステップに基づいてやりたいことを実行できます。
CompletionStage では、非常に強力な多くのステップ処理メソッドが定義されています。参考までに、日常生活で一般的に使用されるいくつかのメソッドのみをここに示します。
簡単な使用法
非同期実行、結果は必要ありません:
// 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
非同期実行と同時に結果を返します:
// 同样可以指定线程池 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!"); System.out.println(stringCompletableFuture.get());
thenRun: 前のステップの結果は必要ありません。新しい操作
thenAccept: 前の非同期処理の内容を取得し、新しい操作を実行します
Async サフィックスが付いているものは、新しい処理操作が依然として非同期であることを意味します。非同期操作では、処理するエグゼキュータを指定できます
// Demo CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") // 针对上一步的结果做处理,产生新的结果 .thenApplyAsync(s -> s.toUpperCase()) // 针对上一步的结果做处理,不返回结果 .thenAcceptAsync(s -> System.out.println(s)) // 不需要上一步返回的结果,直接进行操作 .thenRunAsync(() -> System.out.println("end")); ;2 つの結果を選択します - どちらかを受け入れます2 つのコールバックを処理する場合、任意の完了を行うことができます。が使用され、2 つの結果間に関係がない場合は、acceptEither を使用します。
2 つの非同期スレッドの実行を最初に完了した人がその結果を使用します。他のタイプのメソッドにも同じことが当てはまります。
// 返回abc CompletableFuture .supplyAsync(() -> { SleepUtils.sleep(100); return "Hello CompletableFuture!"; }) .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); // 返回Hello CompletableFuture! CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .acceptEither(CompletableFuture.supplyAsync(() -> { SleepUtils.sleep(100); return "abc"; }), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } });2 つの結果を結合します。次に結合し、両方を受け入れます 次に結合
CompletionStages が 2 つある場合は、2 つの結果を統合して、新しい結果を計算する必要があります。
CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172); CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65) .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() { @Override public Double apply(Integer wight, Integer height) { return wight * 10000.0 / (height * height); } }) ;
2 つの非同期 CompletableFuture の結果が必要です。両方が完了した場合にのみ、thenAccept Both コールバックに入ります。
#// thenAcceptBoth案例: CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() { // 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数 @Override public void accept(String s, String s2) { System.out.println("param1=" + s + ", param2=" + s2); } }); // 结果:param1=Hello CompletableFuture!, param2=abc
例外処理
CompleteFuture を使用して複数の非同期コールバックでチェーン呼び出しを行う場合、 1 回の実行で問題が発生すると、後続のすべてのコールバックが停止するため、例外処理戦略が必要になります。例外的に
#例外的には、エラーが発生したときに、返されるコンテンツを回復してカスタマイズする機会が与えられることを意味します。 CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("发生错误");
}).exceptionally(throwable -> {
log.error("调用错误 {}", throwable.getMessage(), throwable);
return "异常处理内容";
});
Exceptionally は例外が発生した場合にのみ実行されますが、handle はエラーが発生したかどうかに関係なく実行されます。 CompletableFuture.supplyAsync(() -> {
return "abc";
})
.handle((r,err) -> {
log.error("调用错误 {}", err.getMessage(), err);
// 对结果做额外的处理
return r;
})
;
ケース
多数のユーザーがテキスト メッセージを送信|メッセージ
// 假设有500万条记录 long recordCount = 500 * 10000; int subTaskRecordCount = 10000; // 对记录进行分片 List<Map> subTaskList = new LinkedList<>(); for (int i = 0; i < recordCount / 500; i++) { // 如果子任务结构复杂,建议使用对象 HashMap<String, Integer> subTask = new HashMap<>(); subTask.put("index", i); subTask.put("offset", i * subTaskRecordCount); subTask.put("count", subTaskRecordCount); subTaskList.add(subTask); }
2. マルチスレッドを使用します。バッチ読み取り用のスレッド
// 进行subTask批量处理,拆分为不同的任务 subTaskList.stream() .map(subTask -> CompletableFuture.runAsync(()->{ // 读取数据,然后处理 // dataTunel.read(subTask); },excuturs)) // 使用应用的通用任务线程池 .map(c -> ((CompletableFuture<?>) c).join());
3. ビジネス ロジック処理を実行するか、読み取り後に直接ビジネス ロジック処理を実行します。
在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。
当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;
List<Task> taskList = new ArrayList<>(); List<Object> result = taskList.stream() .map(task -> CompletableFuture.supplyAsync(()->{ // handlerMap.get(task).query(); return ""; }, executorService)) .map(c -> c.join()) .collect(Collectors.toList());
如果不使用传入的线程池,大家用默认的线程池ForkJoinPool
thenRun用的默认和上一个任务使用相同的线程池
thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;
exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。
以上がJavaマルチスレッドツールCompletableFutureの使い方の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。