Rumah  >  Artikel  >  Operasi dan penyelenggaraan  >  Bagaimana untuk menyelesaikan masalah menggunakan ParallelStream

Bagaimana untuk menyelesaikan masalah menggunakan ParallelStream

PHPz
PHPzke hadapan
2023-05-21 21:08:211865semak imbas

Contohnya, coretan kod di bawah membuatkan orang membacanya seperti membaca puisi. Tetapi jika ia tidak digunakan dengan betul, ia boleh membawa maut.

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();

Kod ini mempunyai fungsi utama, iaitu strim. Menggunakannya, kita boleh menukar senarai biasa kepada strim, dan kemudian menggunakan saluran paip untuk memproses senarai. Secara keseluruhannya, semua yang saya gunakan mengatakan ia bagus.

Tidak terlalu biasa dengan fungsi ini? Anda boleh merujuk kepada: "Terdapat peta dan Peta rata di mana-mana, apakah maksudnya

Persoalannya akan datang

Bagaimana jika Apakah yang akan berlaku jika kita menggantikan strim dengan parallelStream?

Mengikut maksud literal, strim akan berubah daripada bersiri kepada selari.

Memandangkan bahawa ini adalah situasi selari, jelas terdapat isu keselamatan rangkaian. Walau bagaimanapun, apa yang kita bincangkan di sini bukanlah keperluan untuk menggunakan koleksi selamat benang, kerana topik ini terlalu asas. Pada zaman ini, belajar menggunakan koleksi selamat benang dalam situasi tidak selamat benang telah menjadi kemahiran penting.

Perangkap kali ini ialah isu prestasi penstriman selari.

Kami membiarkan kod bercakap.

Kod berikut mendayakan 8 utas pada masa yang sama dan semua utas menggunakan aliran selari untuk pengiraan data. Dalam logik pelaksanaan, kami membiarkan setiap tugasan tidur selama 1 saat, supaya kami boleh mensimulasikan menunggu yang memakan masa beberapa permintaan I/O.

Menggunakan strim, program akan kembali selepas 30 saat, tetapi kami menjangkakan program itu akan kembali dalam masa lebih daripada 1 saat kerana ia adalah aliran selari dan layak mendapat gelaran ini.

Ujian mendapati kami menunggu lama sebelum tugasan selesai.

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }

Pits

Sebenarnya, kod ini mengambil masa yang berbeza apabila dilaksanakan pada mesin yang berbeza.

Oleh kerana ia selari, mesti ada tahap keselarian. Jika tahap keselarian terlalu rendah, keupayaannya tidak akan digunakan; jika tahap keselarian terlalu tinggi, masa penukaran konteks akan terbuang. Saya sangat kecewa apabila mendapati ramai pembangun kanan, yang mengetahui pelbagai parameter kumpulan benang mengikut hati, dan semua jenis penalaan, berani menutup mata dan menggunakan parallelStream dalam perniagaan intensif I/O.

Untuk memahami tahap paralelisme ini, kita perlu melihat kaedah pembinaan khusus. Cari kod seperti ini dalam kelas ForkJoinPool.

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;

Seperti yang anda lihat, tahap keselarian dikawal oleh parameter berikut. Jika parameter ini tidak dapat diperoleh, keselarian bilangan CPU - 1 akan digunakan secara lalai.

Seperti yang anda lihat, fungsi ini direka untuk perniagaan intensif pengkomputeran. Apabila anda memberikan terlalu banyak tugas kepadanya, pelaksanaan selarinya merosot kepada kesan seperti bersiri.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

Walaupun anda menetapkan saiz awal menggunakan -Djava.util.concurrent.ForkJoinPool.common.parallelism=N, ia masih menghadapi masalah.

Setelah ditetapkan, pembolehubah selari ditetapkan kepada muktamad dan pengubahsuaian adalah dilarang. Dalam erti kata lain, parameter di atas hanya akan berkuat kuasa sekali.

Zhang San boleh menggunakan kod berikut dan menetapkan saiz selari kepada 20.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

John Doe mungkin telah menetapkan nilai ini kepada 30 dengan cara yang sama. Untuk menentukan nilai yang digunakan dalam projek anda, anda perlu bertanya kepada JVM cara memuatkan maklumat kelas.

Kaedah ini tidak begitu boleh dipercayai.

Penyelesaian

Kami boleh mencapai pelbagai jenis pengasingan tugas dengan menyediakan forkjoinpool luaran, iaitu menukar kaedah penyerahan.

Kod adalah seperti berikut Melalui penyerahan kod yang jelas, pengasingan tugas boleh dicapai.

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }

Dengan cara ini, senario yang berbeza boleh mempunyai darjah selari yang berbeza. Terdapat persamaan antara pengurusan sumber manual dengan cara ini dan CountDownLatch, mereka mempunyai tujuan yang sama.

Atas ialah kandungan terperinci Bagaimana untuk menyelesaikan masalah menggunakan ParallelStream. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam