Heim  >  Artikel  >  Java  >  So verwenden Sie das Java-Multithreading-Tool CompletableFuture

So verwenden Sie das Java-Multithreading-Tool CompletableFuture

WBOY
WBOYnach vorne
2023-04-29 08:34:151913Durchsuche

    Vorwort

    Probleme mit Future

    Wenn Sie ein Multithread-Programm schreiben, können Sie Future verwenden, um die Ergebnisse aus einem asynchronen Thread abzurufen. Bei der Verwendung treten jedoch einige Probleme auf:

    • Wenn Sie möchten Um weitere Operationen an den Ergebnissen von Future durchzuführen, müssen Sie den aktuellen Thread blockieren.

    • Mehrere Futures können nicht in einer Kette ausgeführt werden. Die Ergebnisse jedes Futures sind unabhängig. Es wird erwartet, dass sie eine andere asynchrone Aktion für das Ergebnis ausführen einer Zukunft;

    • Es gibt keine Ausnahmebehandlungsstrategie. Wenn die Zukunftsausführung fehlschlägt, muss sie manuell erfasst werden nützliche Toolklasse CompletableFuture in 1.8;

    • Es implementiert die Schnittstellen Future und CompletionStage und stellt entsprechende Verarbeitungsmethoden für die Mängel von Future bereit.

    Unsere neue Verarbeitungslogik kann nach Beendigung der asynchronen Thread-Ausführung automatisch zurückgerufen werden, ohne zu blockieren

      Mehrere asynchrone Aufgaben können angeordnet, kombiniert oder sortiert werden
    • Ausnahmebehandlung
    • Der Kern von CompletableFuture Die Idee ist, dass jede asynchrone Aufgabe als Schritt (CompletionStage) betrachtet werden kann und andere asynchrone Aufgaben dann basierend auf diesem Schritt das tun können, was sie tun möchten.
    • CompletionStage definiert viele Schrittverarbeitungsmethoden, die sehr leistungsfähig sind. Hier sind nur einige im täglichen Leben häufig verwendete Methoden als Referenz.

    • Verwendung

    Grundlegende Verwendung – Asynchrone Aufgabe senden

    Einfache Verwendung

    Asynchrone Ausführung, kein Ergebnis erforderlich:

    // 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool
    CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

    Asynchrone Ausführung mit Rückgabe von Ergebnissen:

    // 同样可以指定线程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
    System.out.println(stringCompletableFuture.get());

    Verarbeitung des letzten Ergebnisses der asynchronen Aufgabe

    thenRun: Nein Bedarf an den Ergebnissen des vorherigen Schritts, direkte neue Operationen

      thenAccept: Holen Sie sich den Inhalt der vorherigen asynchronen Verarbeitung und führen Sie neue Operationen aus
    • thenApply: Holen Sie sich den Inhalt des vorherigen Schritts und generieren Sie dann neuen Inhalt
    • Alles mit dem Suffix „Async“ bedeutet, dass der neue Verarbeitungsvorgang immer noch asynchron ist. Asynchrone Vorgänge können Executors für die Verarbeitung angeben

    // Demo
           CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    // 针对上一步的结果做处理,产生新的结果
                    .thenApplyAsync(s -> s.toUpperCase())
                    // 针对上一步的结果做处理,不返回结果
                    .thenAcceptAsync(s -> System.out.println(s))
                    // 不需要上一步返回的结果,直接进行操作
                    .thenRunAsync(() -> System.out.println("end"));
            ;
    Wählen Sie die beiden Ergebnisse aus -acceptEither

    Wenn wir zwei Rückrufe verarbeiten, kann jede Vervollständigung verwendet werden, die beiden Ergebnisse haben keine Beziehung und verwenden dann AcceptEither.

    So verwenden Sie das Java-Multithreading-Tool CompletableFuture

    Wer zuerst die Ausführung der beiden asynchronen Threads abschließt, wird das Ergebnis verwenden

    Das Gleiche gilt für andere Arten von Methoden.

    ...

    thenCompose verarbeitet das Ergebnis der vorherigen CompletionStage und gibt das Ergebnis zurück. Der Rückgabetyp muss CompletionStage sein.

    So verwenden Sie das Java-Multithreading-Tool CompletableFuture

    thenCombine ruft das Ergebnis der ersten CompletionStage ab, ruft dann die aktuelle CompletionStage ab und verarbeitet die Ergebnisse der beiden.

    So verwenden Sie das Java-Multithreading-Tool CompletableFuture

    // 返回abc
    CompletableFuture
                    .supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "Hello CompletableFuture!";
                    })
                    .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    // 返回Hello CompletableFuture!       
    CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .acceptEither(CompletableFuture.supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "abc";
                    }), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });

    thenAcceptBoth

    erfordert die Ergebnisse von zwei asynchronen CompletableFutures. Wenn beide abgeschlossen sind, wird der thenAcceptBoth-Rückruf eingegeben.

    •         CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);
      
              CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                      .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                          @Override
                          public Double apply(Integer wight, Integer height) {
                              return wight * 10000.0 / (height * height);
                          }
                      })
                      ;
    • Ausnahmebehandlung
    • Wenn wir CompleteFuture zum Durchführen von Kettenaufrufen verwenden und einer der mehreren asynchronen Rückrufe ein Ausführungsproblem aufweist, werden die nachfolgenden Rückrufe gestoppt, sodass eine Ausnahmeverarbeitungsstrategie erforderlich ist.

    Ausnahmsweise

    Ausnahmsweise gilt: Wenn ein Fehler auftritt, haben wir die Möglichkeit, den zurückgegebenen Inhalt wiederherzustellen und anzupassen.

    // thenAcceptBoth案例:
            CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                    		// 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数
                        @Override
                        public void accept(String s, String s2) {
                            System.out.println("param1=" + s + ", param2=" + s2);
                        }
                    });
    // 结果:param1=Hello CompletableFuture!, param2=abc

    handle

    So verwenden Sie das Java-Multithreading-Tool CompletableFuture

    Exceptionally wird nur ausgeführt, wenn eine Ausnahme auftritt, während handle unabhängig davon ausgeführt wird, ob ein Fehler auftritt.

            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("发生错误");
            }).exceptionally(throwable -> {
                log.error("调用错误 {}", throwable.getMessage(), throwable);
                return "异常处理内容";
            });
    So verwenden Sie das Java-Multithreading-Tool CompletableFutureFall

    Eine große Anzahl von Benutzern sendet Textnachrichten

    Die Anforderung besteht darin, Benutzer mit bestimmten Bedingungen in einer Tabelle per Textnachricht zu benachrichtigen. Es gibt jedoch Millionen von Textnachrichtenbenutzern, wenn dies der Fall ist verwendet wird, ist die Leseeffizienz sehr langsam. Zu diesem Zeitpunkt können Sie die Verwendung von Multithreading zum Lesen in Betracht ziehen.

    1 Teilen Sie die Leseaufgabe in mehrere verschiedene Teilaufgaben auf und geben Sie den Offset und die Anzahl der Lesevorgänge an.

    CompletableFuture.supplyAsync(() -> {
        return "abc";
    })
    .handle((r,err) -> {
        log.error("调用错误 {}", err.getMessage(), err);
        // 对结果做额外的处理
        return r;
    })
    ;
    3. Führen Sie die Geschäftslogikverarbeitung durch, oder Sie können die Geschäftslogikverarbeitung auch direkt nach dem Lesen durchführen;

    并发获取商品不同信息

    在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。

    当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;

         List<Task> taskList = new ArrayList<>();
            List<Object> result = taskList.stream()
                    .map(task -> CompletableFuture.supplyAsync(()->{
    //                    handlerMap.get(task).query();
                        return "";
                    }, executorService))
                    .map(c -> c.join())
                    .collect(Collectors.toList());

    问题

    thenRun和thenRunAsync有什么区别

    • 如果不使用传入的线程池,大家用默认的线程池ForkJoinPool

    • thenRun用的默认和上一个任务使用相同的线程池

    • thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;

    handle和exceptional有什么区别

    exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

    Das obige ist der detaillierte Inhalt vonSo verwenden Sie das Java-Multithreading-Tool CompletableFuture. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen