Heim >Java >javaLernprogramm >Wie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen?

Wie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen?

王林
王林nach vorne
2023-05-11 19:04:041212Durchsuche

    CompletableFuture Einführung

    CompletableFuture ist eine neue Funktion, die in 1.8 eingeführt wurde. Sie wird für einige komplexere asynchrone Berechnungsszenarien verwendet. Insbesondere solche, die eine Reihenschaltung erfordern, können Sie die Verwendung von CompletableFuture in Betracht ziehen.

    In der realen Welt sind die komplexen Probleme, die wir lösen müssen, in mehrere Schritte unterteilt. Genau wie in unserem Code werden in einer komplexen logischen Methode mehrere Methoden aufgerufen, um sie Schritt für Schritt zu implementieren.

    Stellen Sie sich das folgende Szenario vor: Das Pflanzen von Bäumen am Arbor Day ist in die folgenden Schritte unterteilt:

    • 10 Minuten lang ein Loch graben#🎜🎜 ## 🎜🎜#

    • Setzlinge für 5 Minuten holen
    • Setzlinge für 20 Minuten pflanzen
    • #🎜 🎜 ##🎜🎜 #水五Minuten

    • Davon können Schritt 1 und 2 parallel ausgeführt werden, nachdem Schritt 1 und 2 abgeschlossen sind. und dann kann Schritt 4 durchgeführt werden.

    • Wir haben folgende Umsetzungsmethoden:

    Nur eine Person pflanzt Bäume

    Wenn jetzt nur eine Person Bäume pflanzt, müssen es 100 Bäume sein Dann kann nur in folgender Reihenfolge vorgegangen werden:

    Auf dem Bild sind nur drei Bäume zu sehen. Sie können sehen, dass Sie bei der seriellen Ausführung nur einen Baum nach dem anderen pflanzen können. Daher sind

    erforderlich, um 100 Bäume zu pflanzen. Diese Methode entspricht dem Programm, bei dem es sich um eine synchrone Single-Threaded-Ausführung handelt. Wie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen?

    Drei Personen pflanzen gleichzeitig Bäume, jede Person ist für das Pflanzen eines Baumes verantwortlich

    40 * 100 = 4000 分钟Wie kann die Baumpflanzzeit verkürzt werden? Sie müssen denken, dass dies nicht einfach zu handhaben ist, nachdem ich so lange Parallelität studiert habe, ist dies definitiv kein Problem für mich. Wollen Sie nicht 100 Bäume pflanzen? Dann werde ich 100 Leute finden, die gemeinsam pflanzen, und jeder wird einen Baum pflanzen. Dann dauert es nur noch 40 Minuten, um 100 Bäume zu pflanzen.

    Ja, wenn Ihr Programm über eine Methode namens plantTree verfügt, die die oben genannten vier Teile enthält, benötigen Sie nur 100 Threads. Bitte beachten Sie jedoch, dass das Erstellen und Zerstören von 100 Threads viele Systemressourcen verbraucht. Und das Erstellen und Zerstören von Threads braucht Zeit. Darüber hinaus kann die Anzahl der Kerne der CPU nicht wirklich 100 Threads gleichzeitig unterstützen. Was wäre, wenn wir 10.000 Bäume pflanzen wollen? Man kann doch nicht 10.000 Threads haben, oder?

    Das ist also nur eine ideale Situation. Wir führen es normalerweise über den Thread-Pool aus und starten nicht wirklich 100 Threads.

    Mehrere Personen pflanzen gleichzeitig Bäume

    Beim Pflanzen jedes Baumes können die unabhängigen Schritte parallel von verschiedenen Personen durchgeführt werden

    Diese Art Diese Methode kann die Pflanzzeit für Bäume weiter verkürzen, da der erste Schritt des Grabens eines Lochs und der zweite Schritt des Sammelns der Setzlinge von zwei Personen parallel durchgeführt werden können, sodass jeder Baum nur 35 Minuten benötigt. Wie unten gezeigt:

    Wenn das Programm immer noch über 100 Hauptthreads verfügt, die die Methode plantTree gleichzeitig ausführen, dauert das Pflanzen von 100 Bäumen nur 35 Minuten. Hier müssen Sie auf jeden Thread achten, da es zwei Threads gibt, die die Schritte 1 und 2 gleichzeitig ausführen. Im tatsächlichen Betrieb werden 100 x 3 = 300 Threads an der Baumpflanzung beteiligt sein. Der Thread, der für die Schritte 1 und 2 verantwortlich ist, nimmt jedoch nur kurz teil und wird dann inaktiv.

    Wie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen?Diese Methode und die zweite Methode haben auch das Problem, dass eine große Anzahl von Threads erstellt wird. Es ist also einfach eine ideale Situation.

    Wenn nur 4 Leute Bäume pflanzen, ist jeder nur für seine eigenen Schritte verantwortlich

    Das sieht man Xiao Wang war zu Beginn mit dem Graben fertig. Nach der ersten Grube hatte Xiao Li zwei Setzlinge geborgen, aber erst jetzt konnte Xiao Zhang mit dem Pflanzen des ersten Setzlings beginnen. Von da an kann Xiao Zhang einen Setzling einzeln pflanzen, und wenn er einen Setzling pflanzt, kann Xiao Zhao ihn parallel gießen. Nach diesem Vorgang dauert das Pflanzen von 100 Setzlingen 10+20x100+5=2015 Minuten. Das ist viel besser als die 4000 Minuten eines einzelnen Threads, aber weitaus geringer als die Geschwindigkeit von 100 Threads, die gleichzeitig Bäume pflanzen. Vergessen Sie jedoch nicht, dass 100 Threads gleichzeitig nur eine ideale Situation sind und diese Methode nur 4 Threads verwendet.

    Wie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen? Nehmen wir einige Anpassungen an der Arbeitsteilung vor. Jeder erledigt nicht nur seine eigene Arbeit, sondern prüft, sobald seine eigene Arbeit erledigt ist, ob es noch andere Arbeiten gibt, die erledigt werden können. Nachdem Xiao Wang zum Beispiel ein Loch gegraben und herausgefunden hatte, dass er Setzlinge pflanzen konnte, pflanzte er Setzlinge. Nachdem Xiao Li mit dem Sammeln der Setzlinge fertig ist, kann er auch Löcher graben oder Setzlinge pflanzen. Auf diese Weise wird die Gesamteffizienz höher sein. Basierend auf dieser Idee unterteilen wir die Aufgaben tatsächlich in 4 Kategorien mit 100 Aufgaben in jeder Kategorie, also insgesamt 400 Aufgaben. Wenn alle 400 Aufgaben abgeschlossen sind, bedeutet dies, dass die gesamte Aufgabe abgeschlossen ist. Dann müssen die Teilnehmer der Aufgabe nur die Abhängigkeiten der Aufgabe kennen und erhalten dann kontinuierlich ausführbare Aufgaben zur Ausführung. Dieser Wirkungsgrad wird am höchsten sein.

    Wie bereits erwähnt, ist es für uns unmöglich, Aufgaben über 100 Threads gleichzeitig auszuführen. Daher verwenden wir unter normalen Umständen einen Thread-Pool, der mit der obigen Designidee übereinstimmt. Nach der Verwendung des Thread-Pools unterteilt die vierte Methode die Schritte in feinere Details und erhöht so die Möglichkeit der Parallelität. Daher ist die Geschwindigkeit höher als bei der zweiten Methode. Welcher ist im Vergleich zum dritten Typ schneller? Wenn die Anzahl der Threads unendlich sein kann, ist die minimale Zeit, die diese beiden Methoden erreichen können, gleich, nämlich 35 Minuten. Wenn jedoch Threads begrenzt sind, nutzt die vierte Methode Threads effizienter, da jeder Schritt parallel ausgeführt werden kann (Personen, die an der Baumpflanzung beteiligt sind, können anderen nach Abschluss ihrer Arbeit helfen), Thread Die Planung ist flexibler, sodass die Threads in Es ist schwierig, den Thread-Pool im Leerlauf zu halten und weiterzulaufen. Ja, niemand kann faul sein. Die dritte Methode kann nur gleichzeitig mit der plantTree-Methode, dem Graben von Löchern und der Gewinnung von Setzlingen, verwendet werden und ist daher nicht so flexibel wie die vierte Methode

    Ich habe oben so viel gesagt, hauptsächlich um die Gründe dafür zu erklären die Entstehung von CompletableFuture. Es wird verwendet, um komplexe Aufgaben in verbundene asynchrone Ausführungsschritte zu zerlegen und so die Gesamteffizienz zu verbessern. Kehren wir zum Thema des Abschnitts zurück: Niemand kann faul sein. Ja, das ist das Ziel von CompletableFuture. Durch die Abstraktion der Recheneinheit können Threads effizient und gleichzeitig an jedem Schritt teilnehmen. Synchroner Code kann durch CompletableFuture vollständig in asynchronen Code umgewandelt werden. Werfen wir einen Blick auf die Verwendung von CompletableFuture.

    CompletableFuture verwendet

    CompletableFuture, um die Future-Schnittstelle und die CompletionStage-Schnittstelle zu implementieren. Die Future-Schnittstelle ist uns bereits bekannt und die CompletionStage-Schnittstelle legt die Spezifikationen zwischen asynchronen Berechnungsschritten fest, um sicherzustellen, dass diese Schritt für Schritt verbunden werden können. CompletionStage definiert 38 öffentliche Methoden für die Verbindung zwischen asynchronen Berechnungsschritten. Als nächstes werden wir einige häufig verwendete und relativ häufig verwendete Methoden auswählen, um zu sehen, wie sie verwendet werden.

    Bekanntes Berechnungsergebnis

    Wenn Sie das Berechnungsergebnis von CompletableFuture bereits kennen, können Sie die statische Methode CompletedFuture verwenden. Übergeben Sie das Berechnungsergebnis und deklarieren Sie das CompletableFuture-Objekt. Beim Aufruf der get-Methode wird das eingehende Berechnungsergebnis sofort zurückgegeben, ohne blockiert zu werden, wie im folgenden Code gezeigt:

    public static void main(String[] args) throws Exception{
        CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World");
        System.out.println("result is " + completableFuture.get());
    }
    // result is Hello World

    Denken Sie, dass diese Verwendung bedeutungslos ist? Nachdem Sie das Berechnungsergebnis kennen, können Sie es einfach direkt verwenden. Warum müssen Sie es mit CompletableFuture verpacken? Dies liegt daran, dass asynchrone Recheneinheiten über CompletableFuture verbunden werden müssen. Selbst wenn wir die Berechnungsergebnisse bereits kennen, müssen wir sie manchmal in CompletableFuture packen, um sie in den asynchronen Rechenprozess zu integrieren.

    Kapselung der asynchronen Berechnungslogik mit Rückgabewerten

    Dies ist unsere am häufigsten verwendete Methode. Kapseln Sie die Logik, die eine asynchrone Berechnung erfordert, in eine Berechnungseinheit und übergeben Sie sie zur Ausführung an CompletableFuture. Wie im folgenden Code:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成

    Hier verwenden wir die SupplyAsync-Methode von CompletableFuture und übergeben ihr eine Implementierung der Lieferantenschnittstelle in Form eines Lambda-Ausdrucks.

    Es ist ersichtlich, dass completableFuture.get() Das erhaltene Berechnungsergebnis der zurückgegebene Wert ist, nachdem die von Ihnen übergebene Funktion ausgeführt wurde. Wenn Sie dann über Logik verfügen, die eine asynchrone Berechnung erfordert, können Sie diese in den von SupplyAsync übergebenen Funktionskörper einfügen. Wie wird diese Funktion asynchron ausgeführt? Wenn Sie dem Code folgen, können Sie sehen, dass SupplyAsync diese Funktion tatsächlich über den Executor, den Thread-Pool, ausführt. completableFuture verwendet standardmäßig ForkJoinPool. Natürlich können Sie auch andere Excutoren für SupplyAsync angeben und diese über den zweiten Parameter an die SupplyAsync-Methode übergeben.

    supplyAsync Es gibt viele Verwendungsszenarien: Das Hauptprogramm muss die Schnittstellen mehrerer Microservices aufrufen, um Daten anzufordern. Anschließend kann es mehrere CompletableFutures starten und SupplyAsync aufrufen über verschiedene Die Aufruflogik der Schnittstelle. Auf diese Weise können verschiedene Schnittstellenanforderungen asynchron und gleichzeitig ausgeführt werden. Wenn schließlich alle Schnittstellen zurückkehren, wird die nachfolgende Logik ausgeführt.

    Asynchrone Berechnungslogik ohne Rückgabewert kapseln

    supplyAsync Die empfangene Funktion hat einen Rückgabewert. In einigen Fällen handelt es sich lediglich um einen Berechnungsprozess und es ist nicht erforderlich, einen Wert zurückzugeben. Dies ähnelt der run-Methode von Runnable, die keinen Wert zurückgibt. In diesem Fall können wir die runAsync-Methode verwenden, beispielsweise den folgenden Code:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成"));
        completableFuture.get();
    }
    // 挖坑完成

    runAsync empfängt die Funktion der ausführbaren Schnittstelle. Es gibt also keinen Rückgabewert. Die Logik in Chestnut besteht lediglich darin, „Loch gegraben“ auszugeben.

    Verarbeiten Sie die asynchron zurückgegebenen Ergebnisse weiter und geben Sie neue Berechnungsergebnisse zurück.

    Wenn wir die asynchrone Berechnung über SupplyAsync abschließen, wird CompletableFuture zurückgegeben und wir können die zurückgegebenen Ergebnisse weiter verarbeiten. , wie zum Beispiel den folgenden Code:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenApply(s -> s + ", 并且归还铁锹")
                .thenApply(s -> s + ", 全部完成。");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹, 全部完成。

    Nachdem wir SupplyAsync aufgerufen haben, rufen wir die Methode thenApply zweimal in einer Kette auf. s ist das von SupplyAsync im vorherigen Schritt zurückgegebene Berechnungsergebnis. Wir haben das Berechnungsergebnis zweimal erneut verarbeitet. Wir können die Berechnungsergebnisse kontinuierlich über thenApply verarbeiten. Wenn Sie die Logik von thenApply asynchron ausführen möchten, können Sie thenApplyAsync verwenden. Die Verwendungsmethode ist dieselbe, sie wird jedoch asynchron über den Thread-Pool ausgeführt.

    进一步处理异步返回的结果,无返回

    这种场景你可以使用thenApply。这个方法可以让你处理上一步的返回结果,但无返回值。参照如下代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"));
        completableFuture.get();
    }

    这里可以看到 thenAccept 接收的函数没有返回值,只有业务逻辑。处理后返回 CompletableFuture 类型对象。

    既不需要返回值,也不需要上一步计算结果,只想在执行结束后再执行一段代码

    此时你可以使用 thenRun 方法,他接收 Runnable 的函数,没有输入也没有输出,仅仅是在异步计算结束后回调一段逻辑,比如记录 log 等。参照下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"))
                .thenRun(() -> System.out.println("挖坑工作已经全部完成"));
        completableFuture.get();
    }
    // 挖坑完成, 并且归还铁锹
    // 挖坑工作已经全部完成

    可以看到在 thenAccept 之后继续调用了 thenRun,仅仅是打印了日志而已

    组合 Future 处理逻辑

    我们可以把两个 CompletableFuture 组合起来使用,如下面的代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且归还铁锹"));
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹

    thenApply 和 thenCompose 的关系就像 stream中的 map 和 flatmap。从上面的例子来看,thenApply 和thenCompose 都可以实现同样的功能。但是如果你使用一个第三方的库,有一个API返回的是CompletableFuture 类型,那么你就只能使用 thenCompose方法。

    组合Futurue结果

    如果你有两个异步操作互相没有依赖,但是第三步操作依赖前两部计算的结果,那么你可以使用 thenCombine 方法来实现,如下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿树苗完成"), (x, y) -> x + y + "植树完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 拿树苗完成植树完成

    挖坑和拿树苗可以同时进行,但是第三步植树则祖尧前两步完成后才能进行。

    可以看到符合我们的预期。使用场景之前也提到过。我们调用多个微服务的接口时,可以使用这种方式进行组合。处理接口调用间的依赖关系。 当你需要两个 Future 的结果,但是不需要再加工后向下游传递计算结果时,可以使用 thenAcceptBoth,用法一样,只不过接收的函数没有返回值。

    并行处理多个 Future

    假如我们对微服务接口的调用不止两个,并且还有一些其它可以异步执行的逻辑。主流程需要等待这些所有的异步操作都返回时,才能继续往下执行。此时我们可以使用 CompletableFuture.allOf 方法。它接收 n 个 CompletableFuture,返回一个 CompletableFuture。对其调用 get 方法后,只有所有的 CompletableFuture 全完成时才会继续后面的逻辑。我们看下面示例代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("挖坑完成");
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取树苗完成");
        });
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取肥料完成");
        });
        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("植树准备工作完成!");
    }
    // 挖坑完成
    // 取肥料完成
    // 取树苗完成
    // 植树准备工作完成!

    异常处理

    在异步计算链中的异常处理可以采用 handle 方法,它接收两个参数,第一个参数是计算及过,第二个参数是异步计算链中抛出的异常。使用方法如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("Computation error");
            }
            return "挖坑完成";
        }).handle((result, throwable) -> {
            if (result == null) {
                return "挖坑异常";
            }
            return result;
        });
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑异常

    代码中会抛出一个 RuntimeException,抛出这个异常时 result 为 null,而 throwable 不为null。根据这些信息你可以在 handle 中进行处理,如果抛出的异常种类很多,你可以判断 throwable 的类型,来选择不同的处理逻辑。

    Das obige ist der detaillierte Inhalt vonWie baut Java Multithreading über CompletableFuture asynchrone Recheneinheiten zusammen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen