Rumah >Java >javaTutorial >Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?

Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?

王林
王林ke hadapan
2023-05-11 19:04:041212semak imbas

    Pengenalan CompletableFuture

    CompletableFuture ialah ciri baharu yang diperkenalkan dalam 1.8 Ia digunakan untuk beberapa senario pengkomputeran tak segerak yang lebih kompleks, terutamanya yang memerlukan berbilang pengkomputeran tak segerak. unit untuk disambungkan secara siri , anda boleh mempertimbangkan untuk menggunakan CompletableFuture untuk melaksanakan.

    Dalam dunia nyata, masalah kompleks yang perlu kita selesaikan terbahagi kepada beberapa langkah. Sama seperti kod kami, dalam kaedah logik yang kompleks, pelbagai kaedah akan dipanggil untuk melaksanakannya langkah demi langkah.

    Bayangkan senario berikut menanam pokok pada Hari Arbor dibahagikan kepada langkah berikut:

    • Gali lubang selama 10 minit

    • Tanam anak pokok selama 5 minit

    • Tanam anak pokok selama 20 minit

    • Siram selama 5 minit

    Antaranya 1 dan 2 boleh dilakukan secara selari Hanya apabila 1 dan 2 selesai, anda boleh meneruskan ke langkah 3, dan kemudian meneruskan ke langkah 4.

    Kami mempunyai kaedah pelaksanaan berikut:

    Hanya seorang sahaja yang menanam pokok

    Jika hanya ada seorang sahaja yang menanam pokok sekarang dan 100 pokok perlu ditanam, maka perkara berikut pesanan hanya boleh diikuti Perlaksanaan:

    Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?

    Hanya tiga pokok ditunjukkan dalam gambar. Anda dapat melihat bahawa dalam pelaksanaan bersiri, anda hanya boleh menanam satu demi satu pokok, jadi ia memerlukan 40 * 100 = 4000 分钟 untuk menanam 100 pokok. Kaedah ini sepadan dengan atur cara, yang merupakan pelaksanaan segerak satu benang.

    Tiga orang tanam pokok serentak, setiap orang bertanggungjawab menanam pokok

    Bagaimana nak pendekkan masa tanam pokok? Anda pasti berfikir bahawa ini tidak mudah untuk dikendalikan Setelah mempelajari concurrency sekian lama, ini pastinya tidak menjadi masalah bagi saya. Tidakkah anda mahu menanam 100 pokok? Kemudian saya akan mencari 100 orang untuk menanam bersama, dan setiap orang akan menanam pokok. Kemudian hanya mengambil masa 40 minit untuk menanam 100 pokok.

    Ya, jika program anda mempunyai kaedah yang dipanggil plantTree, yang mengandungi empat bahagian di atas, maka anda hanya memerlukan 100 utas. Walau bagaimanapun, sila ambil perhatian bahawa penciptaan dan pemusnahan 100 utas menggunakan banyak sumber sistem. Dan mencipta dan memusnahkan benang memerlukan masa. Di samping itu, bilangan teras CPU tidak boleh menyokong 100 utas secara serentak. Bagaimana jika kita ingin menanam 10,000 pokok? Anda tidak boleh mempunyai 10,000 utas, bukan?

    Jadi ini hanyalah situasi yang ideal Kami biasanya melaksanakannya melalui kumpulan benang dan sebenarnya tidak akan memulakan 100 utas.

    Berbilang orang menanam pokok pada masa yang sama

    Apabila menanam setiap pokok, langkah bebas boleh dilakukan oleh orang yang berbeza secara selari

    Kaedah ini boleh memendekkan lagi masa penanaman pokok. Oleh kerana langkah pertama menggali lubang dan langkah kedua mendapatkan anak pokok boleh dilakukan oleh dua orang secara selari, setiap pokok hanya mengambil masa 35 minit. Seperti yang ditunjukkan di bawah:

    Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?

    Jika program masih mempunyai 100 utas utama yang menjalankan kaedah plantTree secara serentak, maka hanya mengambil masa 35 minit untuk menanam 100 pokok. Di sini anda perlu memberi perhatian kepada setiap utas, kerana terdapat dua utas untuk melakukan langkah 1 dan 2 secara serentak. Dalam operasi sebenar, 100 x 3 = 300 utas akan mengambil bahagian dalam penanaman pokok. Tetapi urutan yang bertanggungjawab untuk langkah 1 dan 2 hanya akan mengambil bahagian sebentar dan kemudian menjadi terbiar.

    Kaedah ini dan kaedah kedua juga mempunyai masalah untuk mencipta sejumlah besar benang. Jadi ia hanya satu keadaan yang ideal.

    Jika hanya ada 4 orang yang menanam pokok, setiap orang hanya bertanggungjawab atas langkahnya sendiri

    Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?

    Anda boleh lihat selepas Xiao Wang menggali lubang pertama , Li telah mendapatkan dua anak pokok, tetapi kini Xiao Zhang boleh mula menanam anak pokok pertama. Sejak itu, Xiao Zhang boleh menanam anak pokok satu persatu, dan apabila dia menanam anak pokok, Xiao Zhao boleh menyiramnya secara selari. Berikutan proses ini, ia akan mengambil masa 10+20x100+5=2015 minit untuk menanam 100 anak pokok. Ia jauh lebih baik daripada 4000 minit satu utas, tetapi ia jauh lebih rendah daripada kelajuan 100 utas secara serentak menanam pokok. Tetapi jangan lupa bahawa 100 utas serentak hanyalah situasi yang ideal, dan kaedah ini hanya menggunakan 4 utas.

    Mari kita buat beberapa pelarasan pada pembahagian kerja. Setiap orang bukan sahaja membuat kerja sendiri, tetapi apabila kerja sendiri selesai, mereka melihat jika ada kerja lain yang boleh dilakukan. Sebagai contoh, selepas Xiao Wang menggali lubang dan mendapati bahawa dia boleh menanam anak pokok, dia akan menanam anak pokok. Selepas Xiao Li selesai mengumpul anak pokok, dia juga boleh menggali lubang atau menanam anak pokok. Dengan cara ini, kecekapan keseluruhan akan menjadi lebih tinggi. Jika berdasarkan idea ini, maka kita sebenarnya membahagikan tugas kepada 4 kategori, dengan 100 tugasan dalam setiap kategori, dengan jumlah 400 tugasan. Apabila semua 400 tugasan selesai, ini bermakna keseluruhan tugasan telah selesai. Kemudian peserta tugas hanya perlu mengetahui kebergantungan tugas, dan kemudian terus menerima tugas boleh laku untuk dilaksanakan. Kecekapan ini akan menjadi yang tertinggi.

    Seperti yang dinyatakan sebelum ini, adalah mustahil untuk kami melaksanakan tugasan melalui 100 utas secara serentak, jadi dalam keadaan biasa kami akan menggunakan kumpulan benang, yang bertepatan dengan idea reka bentuk di atas. Selepas menggunakan kumpulan benang, kaedah keempat memecahkan langkah-langkah kepada butiran yang lebih halus, meningkatkan kemungkinan konkurensi. Oleh itu kelajuan akan lebih cepat daripada kaedah kedua. Jadi berbanding dengan jenis ketiga, yang manakah lebih cepat? Jika bilangan utas boleh menjadi tidak terhingga, masa minimum yang boleh dicapai oleh kedua-dua kaedah ini adalah sama, 35 minit. Walau bagaimanapun, apabila benang terhad, kaedah keempat akan menggunakan benang dengan lebih cekap, kerana setiap langkah boleh dilaksanakan secara selari (orang yang terlibat dalam penanaman pokok boleh membantu orang lain selepas menyelesaikan kerja mereka), benang Penjadualan adalah lebih fleksibel, jadi benang dalam kolam benang sukar untuk melahu dan terus berjalan. Ya, tiada siapa yang boleh malas. Kaedah ketiga hanya boleh digunakan serentak dengan kaedah plantTree, menggali lubang dan mendapatkan anak pokok, jadi ia tidak fleksibel seperti kaedah keempat

    Saya telah mengatakan begitu banyak di atas, terutamanya untuk menerangkan sebab-sebab kemunculan daripada CompletableFuture. Ia digunakan untuk memecahkan tugas yang kompleks kepada langkah pelaksanaan tak segerak yang disambungkan, dengan itu meningkatkan kecekapan keseluruhan. Mari kita kembali kepada topik bahagian: Tiada siapa yang boleh malas. Ya, inilah yang bertujuan untuk dicapai oleh CompletableFuture Dengan mengabstraksi unit pengkomputeran, benang boleh mengambil bahagian dalam setiap langkah dengan cekap dan serentak. Kod segerak boleh ditukar sepenuhnya kepada kod tak segerak melalui CompletableFuture. Mari kita lihat cara menggunakan CompletableFuture.

    CompletableFuture menggunakan

    CompletableFuture untuk melaksanakan antara muka Masa Depan dan melaksanakan antara muka CompletionStage. Kami sudah biasa dengan antara muka Masa Depan, dan antara muka CompletionStage menetapkan spesifikasi antara langkah pengiraan tak segerak untuk memastikan ia boleh disambungkan langkah demi langkah. CompletionStage mentakrifkan 38 kaedah awam untuk sambungan antara langkah pengiraan tak segerak. Seterusnya, kami akan memilih beberapa kaedah yang biasa digunakan dan agak kerap digunakan untuk melihat cara menggunakannya.

    Hasil pengiraan yang diketahui

    Jika anda sudah mengetahui hasil pengiraan CompletableFuture, anda boleh menggunakan kaedah statik completedFuture. Lulus dalam hasil pengiraan dan isytiharkan objek CompletableFuture. Apabila memanggil kaedah dapatkan, hasil pengiraan masuk akan dikembalikan serta-merta tanpa disekat, seperti yang ditunjukkan dalam kod berikut:

    public static void main(String[] args) throws Exception{
        CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World");
        System.out.println("result is " + completableFuture.get());
    }
    // result is Hello World

    Adakah anda fikir penggunaan ini tidak bermakna? Setelah anda mengetahui hasil pengiraan, anda boleh menggunakannya secara terus Mengapa anda perlu membungkusnya dengan CompletableFuture? Ini kerana unit pengkomputeran tak segerak perlu disambungkan melalui CompletableFuture, jadi kadangkala walaupun kita sudah mengetahui keputusan pengiraan, kita perlu membungkusnya ke dalam CompletableFuture untuk disepadukan ke dalam proses pengkomputeran tak segerak.

    Enkapsulasi logik pengiraan tak segerak dengan nilai pulangan

    Ini ialah kaedah yang paling biasa kami gunakan. Bungkus logik yang memerlukan pengiraan tak segerak ke dalam unit pengiraan dan serahkan kepada CompletableFuture untuk dijalankan. Seperti dalam kod berikut:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成

    Di sini kami menggunakan kaedah supplyAsync CompletableFuture dan memberikannya pelaksanaan antara muka pembekal dalam bentuk ungkapan lambda.

    Dapat dilihat bahawa hasil pengiraan yang diperolehi oleh completableFuture.get() ialah nilai yang dikembalikan selepas fungsi yang anda lalui dilaksanakan. Kemudian jika anda mempunyai logik yang memerlukan pengiraan tak segerak, anda boleh memasukkannya ke dalam badan fungsi yang dihantar oleh supplyAsync. Bagaimanakah fungsi ini dilaksanakan secara tak segerak? Jika anda mengikut kod, anda boleh melihat bahawa supplyAsync sebenarnya menjalankan fungsi ini melalui Pelaksana, iaitu kumpulan benang. completableFuture menggunakan ForkJoinPool secara lalai Sudah tentu, anda juga boleh menentukan Excutor lain untuk supplyAsync dan menghantarnya ke kaedah supplyAsync melalui parameter kedua.

    supplyAsync digunakan dalam banyak senario Untuk contoh mudah, program utama perlu memanggil antara muka berbilang perkhidmatan mikro untuk meminta data Kemudian ia boleh memulakan berbilang CompletableFutures dan memanggil supplyAsync Logik panggilan. Dengan cara ini, permintaan antara muka yang berbeza boleh dijalankan secara tidak segerak dan serentak, dan akhirnya apabila semua antara muka kembali, logik seterusnya akan dilaksanakan.

    Merangkum logik pengiraan tak segerak tanpa nilai pulangan

    Fungsi yang diterima oleh supplyAsync mempunyai nilai pulangan. Dalam sesetengah kes, kami hanyalah proses pengiraan dan tidak perlu mengembalikan nilai. Ini seperti kaedah larian Runnable, yang tidak mengembalikan nilai. Dalam kes ini kita boleh menggunakan kaedah runAsync, seperti kod berikut:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成"));
        completableFuture.get();
    }
    // 挖坑完成

    runAsync menerima fungsi antara muka boleh jalan. Jadi tiada nilai pulangan. Logik dalam Chestnut hanya mencetak "Penggalian selesai".

    Proses selanjutnya hasil yang dikembalikan secara tak segerak dan kembalikan hasil pengiraan baharu

    Apabila kami melengkapkan pengiraan tak segerak melalui supplyAsync, CompletableFuture dikembalikan pada masa ini, kami boleh terus memproses hasil yang dikembalikan. seperti yang ditunjukkan di bawah Kod:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenApply(s -> s + ", 并且归还铁锹")
                .thenApply(s -> s + ", 全部完成。");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹, 全部完成。

    Selepas memanggil supplyAsync, kami memanggil kaedah thenApply dua kali. s ialah hasil pengiraan yang dikembalikan oleh supplyAsync dalam langkah sebelumnya Kami memproses semula hasil pengiraan dua kali. Kami boleh terus memproses keputusan pengiraan melalui thenApply. Jika anda ingin menjalankan logik thenApply secara tidak segerak, anda boleh menggunakan thenApplyAsync. Kaedah penggunaan adalah sama, tetapi ia akan berjalan secara tidak segerak melalui kumpulan benang.

    进一步处理异步返回的结果,无返回

    这种场景你可以使用thenApply。这个方法可以让你处理上一步的返回结果,但无返回值。参照如下代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"));
        completableFuture.get();
    }

    这里可以看到 thenAccept 接收的函数没有返回值,只有业务逻辑。处理后返回 CompletableFuture 类型对象。

    既不需要返回值,也不需要上一步计算结果,只想在执行结束后再执行一段代码

    此时你可以使用 thenRun 方法,他接收 Runnable 的函数,没有输入也没有输出,仅仅是在异步计算结束后回调一段逻辑,比如记录 log 等。参照下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且归还铁锹"))
                .thenRun(() -> System.out.println("挖坑工作已经全部完成"));
        completableFuture.get();
    }
    // 挖坑完成, 并且归还铁锹
    // 挖坑工作已经全部完成

    可以看到在 thenAccept 之后继续调用了 thenRun,仅仅是打印了日志而已

    组合 Future 处理逻辑

    我们可以把两个 CompletableFuture 组合起来使用,如下面的代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且归还铁锹"));
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且归还铁锹

    thenApply 和 thenCompose 的关系就像 stream中的 map 和 flatmap。从上面的例子来看,thenApply 和thenCompose 都可以实现同样的功能。但是如果你使用一个第三方的库,有一个API返回的是CompletableFuture 类型,那么你就只能使用 thenCompose方法。

    组合Futurue结果

    如果你有两个异步操作互相没有依赖,但是第三步操作依赖前两部计算的结果,那么你可以使用 thenCombine 方法来实现,如下面代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿树苗完成"), (x, y) -> x + y + "植树完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 拿树苗完成植树完成

    挖坑和拿树苗可以同时进行,但是第三步植树则祖尧前两步完成后才能进行。

    可以看到符合我们的预期。使用场景之前也提到过。我们调用多个微服务的接口时,可以使用这种方式进行组合。处理接口调用间的依赖关系。 当你需要两个 Future 的结果,但是不需要再加工后向下游传递计算结果时,可以使用 thenAcceptBoth,用法一样,只不过接收的函数没有返回值。

    并行处理多个 Future

    假如我们对微服务接口的调用不止两个,并且还有一些其它可以异步执行的逻辑。主流程需要等待这些所有的异步操作都返回时,才能继续往下执行。此时我们可以使用 CompletableFuture.allOf 方法。它接收 n 个 CompletableFuture,返回一个 CompletableFuture。对其调用 get 方法后,只有所有的 CompletableFuture 全完成时才会继续后面的逻辑。我们看下面示例代码:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("挖坑完成");
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取树苗完成");
        });
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取肥料完成");
        });
        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("植树准备工作完成!");
    }
    // 挖坑完成
    // 取肥料完成
    // 取树苗完成
    // 植树准备工作完成!

    异常处理

    在异步计算链中的异常处理可以采用 handle 方法,它接收两个参数,第一个参数是计算及过,第二个参数是异步计算链中抛出的异常。使用方法如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("Computation error");
            }
            return "挖坑完成";
        }).handle((result, throwable) -> {
            if (result == null) {
                return "挖坑异常";
            }
            return result;
        });
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑异常

    代码中会抛出一个 RuntimeException,抛出这个异常时 result 为 null,而 throwable 不为null。根据这些信息你可以在 handle 中进行处理,如果抛出的异常种类很多,你可以判断 throwable 的类型,来选择不同的处理逻辑。

    Atas ialah kandungan terperinci Bagaimanakah java multi-threading memasang unit pengkomputeran tak segerak melalui CompletableFuture?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam