ホームページ  >  記事  >  運用・保守  >  ParallelStream 使用の落とし穴を解決する方法

ParallelStream 使用の落とし穴を解決する方法

PHPz
PHPz転載
2023-05-21 21:08:211825ブラウズ

たとえば、次のコード スニペットを使用すると、人々は詩を読むように読むことができます。しかし、使い方を誤ると致命傷になる可能性があります。

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();

このコードには、ストリームという重要な機能があります。これを使用すると、通常のリストをストリームに変換し、パイプラインを使用してリストを処理できます。全体として、私が使用したものはすべて良いと言っています。

これらの関数にあまり詳しくありませんか?「Map と flatMap はどこにでもありますが、それらは何を意味しますか?」を参照してください。

ここで質問があります

If ストリームをParallelStreamに置き換えるとどうなりますか?

文字通りの意味によれば、ストリームはシリアルからパラレルに変わります。

これが並行状況であることを考慮すると、スレッドの安全性の問題が発生することは明らかです。ただし、このトピックは基本すぎるため、ここで議論しているのはスレッドセーフなコレクションを使用する必要性ではありません。今日では、スレッドセーフでない状況でスレッドセーフなコレクションを使用する方法を学ぶことは、必須のスキルとなっています。

今回の落とし穴は、並列ストリーミングのパフォーマンスの問題です。

コードに語らせます。

次のコードは、同時に 8 つのスレッドを有効にし、すべてのスレッドがデータ計算に並列ストリームを使用します。実行ロジックでは、各タスクを 1 秒間スリープさせて、時間のかかる一部の I/O リクエストの待機をシミュレートできるようにします。

ストリームを使用すると、プログラムは 30 秒後に戻りますが、これは並列ストリームであり、このタイトルにふさわしいため、プログラムは 1 秒以上で戻ることが予想されます。

テストの結果、タスクが完了するまでに長時間待機していることが判明しました。

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }

pit

実際、このコードは異なるマシンで実行すると、実行にかかる時間も異なります。

並列処理であるため、ある程度の並列処理が必要です。並列度が低すぎるとその能力が発揮されず、並列度が高すぎるとコンテキスト切り替え時間が無駄になります。私は、スレッド プールのさまざまなパラメータやさまざまなチューニングを暗記している多くの上級開発者が、敢えて見て見ぬふりをして、I/O 集中型のビジネスでParallelStream を使用していることに気づき、非常にイライラしました。

この程度の並列性を理解するには、具体的な構築方法を確認する必要があります。 ForkJoinPool クラスで次のようなコードを見つけます。

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;

ご覧のとおり、並列度は次のパラメータによって制御されます。このパラメータが取得できない場合は、デフォルトで CPU 数 - 1 の並列処理が使用されます。

ご覧のとおり、この機能はコンピューティング集約型のビジネス向けに設計されています。割り当てたタスクが多すぎると、並列実行がシリアルのような効果に低下します。

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

-Djava.util.concurrent.ForkJoinPool.common.Parallelism=N を使用して初期サイズを設定した場合でも、依然として問題が発生します。

一度設定されると、並列処理変数は Final に設定され、変更が禁止されます。つまり、上記のパラメータは 1 回だけ有効になります。

Zhang San は次のコードを使用して、並列処理サイズを 20 に設定します。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

Li Si も同様にこの値を 30 に設定した可能性があります。プロジェクトでどの値が使用されているかを判断するには、クラス情報をロードする方法を JVM に問い合わせる必要があります。

この方法はあまり信頼性がありません。

#解決策

外部の forkjoinpool を提供する、つまり送信方法を変更することで、さまざまなタイプのタスク分離を実現できます。

コードは次のとおりです。タスクの分離は、明示的にコードを送信することで実現できます。

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }
このように、シナリオごとに異なる並列度を持たせることができます。この方法による手動リソース管理と CountDownLatch には類似点があり、同じ目的を持っています。

以上がParallelStream 使用の落とし穴を解決する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。