Heim >Java >javaLernprogramm >So verwenden Sie das Java-Multithreading-Tool CompletableFuture
Wenn Sie ein Multithread-Programm schreiben, können Sie Future verwenden, um die Ergebnisse aus einem asynchronen Thread abzurufen. Bei der Verwendung treten jedoch einige Probleme auf:
Wenn Sie möchten Um weitere Operationen an den Ergebnissen von Future durchzuführen, müssen Sie den aktuellen Thread blockieren.
Mehrere Futures können nicht in einer Kette ausgeführt werden. Die Ergebnisse jedes Futures sind unabhängig. Es wird erwartet, dass sie eine andere asynchrone Aktion für das Ergebnis ausführen einer Zukunft;
Es gibt keine Ausnahmebehandlungsstrategie. Wenn die Zukunftsausführung fehlschlägt, muss sie manuell erfasst werden nützliche Toolklasse CompletableFuture in 1.8;
Unsere neue Verarbeitungslogik kann nach Beendigung der asynchronen Thread-Ausführung automatisch zurückgerufen werden, ohne zu blockieren
CompletionStage definiert viele Schrittverarbeitungsmethoden, die sehr leistungsfähig sind. Hier sind nur einige im täglichen Leben häufig verwendete Methoden als Referenz.
Grundlegende Verwendung – Asynchrone Aufgabe senden
Einfache Verwendung
// 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
// 同样可以指定线程池 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!"); System.out.println(stringCompletableFuture.get());
Verarbeitung des letzten Ergebnisses der asynchronen Aufgabe
thenRun: Nein Bedarf an den Ergebnissen des vorherigen Schritts, direkte neue Operationen
// Demo CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") // 针对上一步的结果做处理,产生新的结果 .thenApplyAsync(s -> s.toUpperCase()) // 针对上一步的结果做处理,不返回结果 .thenAcceptAsync(s -> System.out.println(s)) // 不需要上一步返回的结果,直接进行操作 .thenRunAsync(() -> System.out.println("end")); ;Wählen Sie die beiden Ergebnisse aus -acceptEitherWenn wir zwei Rückrufe verarbeiten, kann jede Vervollständigung verwendet werden, die beiden Ergebnisse haben keine Beziehung und verwenden dann AcceptEither. Wer zuerst die Ausführung der beiden asynchronen Threads abschließt, wird das Ergebnis verwenden
...
thenCompose verarbeitet das Ergebnis der vorherigen CompletionStage und gibt das Ergebnis zurück. Der Rückgabetyp muss CompletionStage sein.
thenCombine ruft das Ergebnis der ersten CompletionStage ab, ruft dann die aktuelle CompletionStage ab und verarbeitet die Ergebnisse der beiden.// 返回abc CompletableFuture .supplyAsync(() -> { SleepUtils.sleep(100); return "Hello CompletableFuture!"; }) .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); // 返回Hello CompletableFuture! CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .acceptEither(CompletableFuture.supplyAsync(() -> { SleepUtils.sleep(100); return "abc"; }), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } });thenAcceptBoth
erfordert die Ergebnisse von zwei asynchronen CompletableFutures. Wenn beide abgeschlossen sind, wird der thenAcceptBoth-Rückruf eingegeben.
CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172); CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65) .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() { @Override public Double apply(Integer wight, Integer height) { return wight * 10000.0 / (height * height); } }) ;
Wenn wir CompleteFuture zum Durchführen von Kettenaufrufen verwenden und einer der mehreren asynchronen Rückrufe ein Ausführungsproblem aufweist, werden die nachfolgenden Rückrufe gestoppt, sodass eine Ausnahmeverarbeitungsstrategie erforderlich ist.
Ausnahmsweise gilt: Wenn ein Fehler auftritt, haben wir die Möglichkeit, den zurückgegebenen Inhalt wiederherzustellen und anzupassen.
// thenAcceptBoth案例: CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() { // 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数 @Override public void accept(String s, String s2) { System.out.println("param1=" + s + ", param2=" + s2); } }); // 结果:param1=Hello CompletableFuture!, param2=abchandleExceptionally wird nur ausgeführt, wenn eine Ausnahme auftritt, während handle unabhängig davon ausgeführt wird, ob ein Fehler auftritt.
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("发生错误"); }).exceptionally(throwable -> { log.error("调用错误 {}", throwable.getMessage(), throwable); return "异常处理内容"; });Fall
Die Anforderung besteht darin, Benutzer mit bestimmten Bedingungen in einer Tabelle per Textnachricht zu benachrichtigen. Es gibt jedoch Millionen von Textnachrichtenbenutzern, wenn dies der Fall ist verwendet wird, ist die Leseeffizienz sehr langsam. Zu diesem Zeitpunkt können Sie die Verwendung von Multithreading zum Lesen in Betracht ziehen.
1 Teilen Sie die Leseaufgabe in mehrere verschiedene Teilaufgaben auf und geben Sie den Offset und die Anzahl der Lesevorgänge an. CompletableFuture.supplyAsync(() -> {
return "abc";
})
.handle((r,err) -> {
log.error("调用错误 {}", err.getMessage(), err);
// 对结果做额外的处理
return r;
})
;
3. Führen Sie die Geschäftslogikverarbeitung durch, oder Sie können die Geschäftslogikverarbeitung auch direkt nach dem Lesen durchführen;
在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。
当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;
List<Task> taskList = new ArrayList<>(); List<Object> result = taskList.stream() .map(task -> CompletableFuture.supplyAsync(()->{ // handlerMap.get(task).query(); return ""; }, executorService)) .map(c -> c.join()) .collect(Collectors.toList());
如果不使用传入的线程池,大家用默认的线程池ForkJoinPool
thenRun用的默认和上一个任务使用相同的线程池
thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;
exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。
Das obige ist der detaillierte Inhalt vonSo verwenden Sie das Java-Multithreading-Tool CompletableFuture. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!