Heim  >  Artikel  >  Betrieb und Instandhaltung  >  So lösen Sie die Fallstricke bei der Verwendung von ParallelStream

So lösen Sie die Fallstricke bei der Verwendung von ParallelStream

PHPz
PHPznach vorne
2023-05-21 21:08:211871Durchsuche

Zum Beispiel sorgt das Code-Snippet unten dafür, dass die Leute es lesen, als würden sie Gedichte lesen. Doch bei falscher Anwendung kann es tödlich sein.

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();

Dieser Code hat eine Schlüsselfunktion, nämlich Stream. Damit können wir eine gewöhnliche Liste in einen Stream umwandeln und dann Pipelines verwenden, um die Liste zu verarbeiten. Alles in allem sagt alles, was ich verwendet habe, dass es gut ist.

Sie sind mit diesen Funktionen nicht so vertraut? Sie können sich auf Folgendes beziehen: „Karte und FlatMap sind überall, was bedeuten sie?“ 🎜🎜## 🎜🎜#

Was passiert, wenn wir Stream in ParallelStream ändern?

Entsprechend der wörtlichen Bedeutung ändert sich der Stream von seriell zu parallel.

Angesichts der Tatsache, dass dies eine parallele Situation ist, ist es offensichtlich, dass es Probleme mit der Thread-Sicherheit geben wird. Allerdings sprechen wir hier nicht über die Notwendigkeit, threadsichere Sammlungen zu verwenden, da dieses Thema zu grundlegend ist. Heutzutage ist das Erlernen der Verwendung threadsicherer Sammlungen in Thread-unsicheren Situationen zu einer wesentlichen Fähigkeit geworden.

Die Falle ist dieses Mal das Leistungsproblem des parallelen Streamings.

Wir lassen Code sprechen.

Der folgende Code aktiviert 8 Threads gleichzeitig, und alle Threads verwenden parallele Streams zur Datenberechnung. In der Ausführungslogik lassen wir jede Aufgabe 1 Sekunde lang ruhen, um das zeitaufwändige Warten einiger E/A-Anfragen zu simulieren.

Bei Verwendung von Stream kehrt das Programm nach 30 Sekunden zurück. Wir gehen jedoch davon aus, dass das Programm in mehr als 1 Sekunde zurückkehrt, da es sich um einen parallelen Stream handelt und diesen Titel verdient.

Der Test ergab, dass wir lange gewartet haben, bis die Aufgabe abgeschlossen war.

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }

pit

Tatsächlich dauert dieser Code unterschiedlich lange, wenn er auf verschiedenen Maschinen ausgeführt wird.

Da es parallel ist, muss ein gewisses Maß an Parallelität vorhanden sein. Wenn der Grad der Parallelität zu niedrig ist, können seine Fähigkeiten nicht genutzt werden; ist der Grad der Parallelität zu hoch, wird Zeit für den Kontextwechsel verschwendet. Ich war sehr frustriert, als ich feststellte, dass viele erfahrene Entwickler, die die verschiedenen Parameter des Thread-Pools und alle Arten von Optimierungen auswendig kennen, es wagen, ein Auge zuzudrücken und parallelStream in E/A-intensiven Unternehmen zu verwenden.

Um diesen Grad der Parallelität zu verstehen, müssen wir uns die spezifische Konstruktionsmethode ansehen. Finden Sie Code wie diesen in der ForkJoinPool-Klasse.

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;
Sie können sehen, dass der Grad der Parallelität durch die folgenden Parameter gesteuert wird. Wenn dieser Parameter nicht abgerufen werden kann, wird standardmäßig die Parallelität der Anzahl der CPUs - 1 verwendet.

Wie Sie sehen, ist diese Funktion für rechenintensive Unternehmen konzipiert. Wenn Sie ihm zu viele Aufgaben zuweisen, verschlechtert sich seine parallele Ausführung zu einem seriellen Effekt.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
Auch wenn Sie mit -Djava.util.concurrent.ForkJoinPool.common.parallelism=N eine Anfangsgröße festlegen, treten immer noch Probleme auf.

Sobald die Parallelitätsvariable festgelegt ist, ist sie auf endgültig gesetzt und Änderungen sind nicht möglich. Mit anderen Worten: Die oben genannten Parameter werden nur einmal wirksam.

Zhang San kann den folgenden Code verwenden und die Parallelitätsgröße auf 20 festlegen.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Li Si hat diesen Wert möglicherweise auf die gleiche Weise auf 30 gesetzt. Um zu bestimmen, welcher Wert in Ihrem Projekt verwendet wird, müssen Sie die JVM fragen, wie die Klasseninformationen geladen werden sollen.

Diese Methode ist nicht sehr zuverlässig.

Eine Lösung

Wir können verschiedene Arten der Aufgabentrennung erreichen, indem wir einen externen Forkjoinpool bereitstellen, also die Übermittlungsmethode ändern.

Der Code ist wie unten gezeigt und kann durch explizite Codeübermittlung erreicht werden.

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }
Auf diese Weise können verschiedene Szenarien unterschiedliche Parallelitätsgrade aufweisen. Es gibt Ähnlichkeiten zwischen der manuellen Ressourcenverwaltung auf diese Weise und CountDownLatch, sie haben den gleichen Zweck.

Das obige ist der detaillierte Inhalt vonSo lösen Sie die Fallstricke bei der Verwendung von ParallelStream. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen