Maison  >  Article  >  Opération et maintenance  >  Comment résoudre les pièges liés à l'utilisation de ParallelStream

Comment résoudre les pièges liés à l'utilisation de ParallelStream

PHPz
PHPzavant
2023-05-21 21:08:211824parcourir

Par exemple, l'extrait de code ci-dessous incite les gens à le lire comme on lit de la poésie. Mais si elle est mal utilisée, elle peut être fatale.

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();

Ce code a une fonction clé, qui est le flux. En l'utilisant, nous pouvons convertir une liste ordinaire en flux, puis utiliser des pipelines pour traiter la liste. Dans l’ensemble, tout ce que j’ai utilisé dit que c’est bon.

Vous ne connaissez pas trop ces fonctions ? Vous pouvez vous référer à : "Map et flatMap sont partout, que signifient-ils ?"

La question arrive

Que se passera-t-il si on remplace stream par parallelStream ?

Selon au sens littéral Signification, le flux passera de série à parallèle.

Considérant qu'il s'agit d'une situation parallèle, il est évident qu'il y aura des problèmes de sécurité des threads. Cependant, ce dont nous discutons ici n’est pas la nécessité d’utiliser des collections thread-safe, car ce sujet est trop basique. De nos jours, apprendre à utiliser des collections thread-safe dans des situations thread-safe est devenu une compétence essentielle.

Le piège cette fois-ci est le problème de performances du streaming parallèle.

Nous laissons le code parler.

Le code suivant active 8 threads en même temps, et tous les threads utilisent des flux parallèles pour le calcul des données. Dans la logique d'exécution, nous laissons chaque tâche dormir pendant 1 seconde, afin de pouvoir simuler l'attente chronophage de certaines requêtes d'E/S.

En utilisant le flux, le programme reviendra après 30 secondes, mais nous nous attendons à ce que le programme revienne dans plus d'une seconde, car il s'agit d'un flux parallèle et mérite ce titre.

Le test a révélé que nous avons attendu longtemps avant que la tâche ne soit terminée.

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }

pit

En fait, ce code prend des temps différents lorsqu'il est exécuté sur différentes machines.

Puisqu'il est parallèle, il doit y avoir un certain degré de parallélisme. Si le degré de parallélisme est trop faible, ses capacités ne seront pas exercées ; si le degré de parallélisme est trop élevé, le temps de changement de contexte sera perdu. J'ai été très frustré de constater que de nombreux développeurs seniors, qui connaissent par cœur les différents paramètres du pool de threads et toutes sortes de réglages, osent fermer les yeux et utiliser parallelStream dans des activités à forte intensité d'E/S.

Pour comprendre ce degré de parallélisme, nous devons examiner la méthode de construction spécifique. Recherchez du code comme celui-ci dans la classe ForkJoinPool.

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;

Vous pouvez voir que le degré de parallélisme est contrôlé par les paramètres suivants. Si ce paramètre ne peut être obtenu, le parallélisme du nombre de CPU - 1 sera utilisé par défaut.

Vous pouvez constater que cette fonction est conçue pour les entreprises gourmandes en informatique. Lorsque vous lui attribuez trop de tâches, son exécution parallèle se dégrade en un effet de type série.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

Même si vous définissez une taille initiale en utilisant -Djava.util.concurrent.ForkJoinPool.common.parallelism=N, il y a toujours des problèmes.

Une fois définie, la variable de parallélisme est définie sur finale et la modification est interdite. Autrement dit, les paramètres ci-dessus ne prendront effet qu’une seule fois.

Zhang San peut utiliser le code suivant et définir la taille du parallélisme sur 20.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

John Doe a peut-être fixé cette valeur à 30 de la même manière. Pour déterminer quelle valeur est utilisée dans votre projet, vous devez demander à la JVM comment charger les informations de classe.

Cette méthode n'est pas très fiable.

Une solution

Nous pouvons réaliser différents types de séparation des tâches en fournissant un forkjoinpool externe, c'est-à-dire en changeant la méthode de soumission.

Le code est le suivant. Grâce à la soumission explicite du code, la séparation des tâches peut être obtenue.

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }

De cette façon, différents scénarios peuvent avoir différents degrés de parallélisme. Il existe des similitudes entre la gestion manuelle des ressources de cette manière et CountDownLatch, ils ont le même objectif.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer