たとえば、次のコード スニペットを使用すると、人々は詩を読むように読むことができます。しかし、使い方を誤ると致命傷になる可能性があります。
List<Integer> transactionsIds = widgets.stream() .filter(b -> b.getColor() == RED) .sorted((x,y) -> x.getWeight() - y.getWeight()) .mapToInt(Widget::getWeight) .sum();
このコードには、ストリームという重要な機能があります。これを使用すると、通常のリストをストリームに変換し、パイプラインを使用してリストを処理できます。全体として、私が使用したものはすべて良いと言っています。
これらの関数にあまり詳しくありませんか?「Map と flatMap はどこにでもありますが、それらは何を意味しますか?」を参照してください。
ここで質問があります
If ストリームをParallelStreamに置き換えるとどうなりますか?
文字通りの意味によれば、ストリームはシリアルからパラレルに変わります。
これが並行状況であることを考慮すると、スレッドの安全性の問題が発生することは明らかです。ただし、このトピックは基本すぎるため、ここで議論しているのはスレッドセーフなコレクションを使用する必要性ではありません。今日では、スレッドセーフでない状況でスレッドセーフなコレクションを使用する方法を学ぶことは、必須のスキルとなっています。
今回の落とし穴は、並列ストリーミングのパフォーマンスの問題です。
コードに語らせます。
次のコードは、同時に 8 つのスレッドを有効にし、すべてのスレッドがデータ計算に並列ストリームを使用します。実行ロジックでは、各タスクを 1 秒間スリープさせて、時間のかかる一部の I/O リクエストの待機をシミュレートできるようにします。
ストリームを使用すると、プログラムは 30 秒後に戻りますが、これは並列ストリームであり、このタイトルにふさわしいため、プログラムは 1 秒以上で戻ることが予想されます。
テストの結果、タスクが完了するまでに長時間待機していることが判明しました。
static void paralleTest() { List<Integer> numbers = Arrays.asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ); final long begin = System.currentTimeMillis(); numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList()); } public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); }
pit
実際、このコードは異なるマシンで実行すると、実行にかかる時間も異なります。
並列処理であるため、ある程度の並列処理が必要です。並列度が低すぎるとその能力が発揮されず、並列度が高すぎるとコンテキスト切り替え時間が無駄になります。私は、スレッド プールのさまざまなパラメータやさまざまなチューニングを暗記している多くの上級開発者が、敢えて見て見ぬふりをして、I/O 集中型のビジネスでParallelStream を使用していることに気づき、非常にイライラしました。
この程度の並列性を理解するには、具体的な構築方法を確認する必要があります。 ForkJoinPool クラスで次のようなコードを見つけます。
try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } if (fac == null) { if (System.getSecurityManager() == null) fac = defaultForkJoinWorkerThreadFactory; else // use security-managed default fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP;
ご覧のとおり、並列度は次のパラメータによって制御されます。このパラメータが取得できない場合は、デフォルトで CPU 数 - 1 の並列処理が使用されます。
ご覧のとおり、この機能はコンピューティング集約型のビジネス向けに設計されています。割り当てたタスクが多すぎると、並列実行がシリアルのような効果に低下します。
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
-Djava.util.concurrent.ForkJoinPool.common.Parallelism=N を使用して初期サイズを設定した場合でも、依然として問題が発生します。
一度設定されると、並列処理変数は Final に設定され、変更が禁止されます。つまり、上記のパラメータは 1 回だけ有効になります。
Zhang San は次のコードを使用して、並列処理サイズを 20 に設定します。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Li Si も同様にこの値を 30 に設定した可能性があります。プロジェクトでどの値が使用されているかを判断するには、クラス情報をロードする方法を JVM に問い合わせる必要があります。
この方法はあまり信頼性がありません。
#解決策
外部の forkjoinpool を提供する、つまり送信方法を変更することで、さまざまなタイプのタスク分離を実現できます。 コードは次のとおりです。タスクの分離は、明示的にコードを送信することで実現できます。ForkJoinPool pool = new ForkJoinPool(30); final long begin = System.currentTimeMillis(); try { pool.submit(() -> numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList())).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }このように、シナリオごとに異なる並列度を持たせることができます。この方法による手動リソース管理と CountDownLatch には類似点があり、同じ目的を持っています。
以上がParallelStream 使用の落とし穴を解決する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。