例如下面的程式碼片段,讓人閱讀的時候就像是讀詩一樣。但是一旦用不好,也是會要命的。
List<Integer> transactionsIds = widgets.stream() .filter(b -> b.getColor() == RED) .sorted((x,y) -> x.getWeight() - y.getWeight()) .mapToInt(Widget::getWeight) .sum();
這段程式碼有一個關鍵的函數,那就是stream。利用它,我們可以將一個普通的列表轉換為流,進而利用管道的方式處理該列表。總之,用過的都說好。
對這些函數還不是太熟悉?可以參考:《到處是map、flatMap,啥意思?》
問題來了
假如我們把stream換成parallelStream,會發生什麼情況?
根據字面上的意思,流會從串列變成並行。
考慮到這是一個並行的情況,很明顯會存在線程安全性問題。然而,我們在此討論的並不是需要使用線程安全集合,因為這個主題太基礎了。在當今時代,學會在線程不安全的情況下使用線程安全的集合已成為一項基本技能。
這次踩坑的地方,是並行流的效能問題。
我們用程式碼來說話。
以下程式碼同時啟用了8個線程,所有線程都在使用並行流進行資料計算。在執行的邏輯中,我們讓每個任務都sleep 1秒鐘,這樣就能夠模擬一些I/O請求的耗時等待。
使用stream,程式會在30秒後返回,但我們期望程式能夠在1秒多返回,因為它是並行流,得對得起這個稱號。
測試發現,我們等了好久,任務才執行完畢。
static void paralleTest() { List<Integer> numbers = Arrays.asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ); final long begin = System.currentTimeMillis(); numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList()); } public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); }
坑
實際上,在不同的機器上執行,這段程式碼花費的時間都不一樣。
既然是並行,那肯定得有個並行度。如果並行度太低,就無法發揮其能力;如果並行度太高,會浪費上下文切換的時間。我是很沮喪的發現,很多高級研發,將線程池的各種參數背的滾瓜爛熟,各種調優,竟然敢睜一隻眼閉一隻眼的在I/O密集型業務中用上parallelStream。
要了解這個並行度,我們需要查看具體的建構方法。在ForkJoinPool類別中找到這樣的程式碼。
try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } if (fac == null) { if (System.getSecurityManager() == null) fac = defaultForkJoinWorkerThreadFactory; else // use security-managed default fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP;
可以看到,並行度到底是多少,是由下面的參數來控制的。如果無法取得這個參數,則預設使用 CPU個數-1 的並行度。
可以看到,這個函數是為了計算密集型業務去設計的。當你向它分配過多的任務時,它的並行執行會降級為類似於串行的效果。
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設定了一個初始值大小,它依然有問題。
一旦設定,parallelism變數就被設為final,禁止修改。也就是說,上面的參數只會生效一次。
張三可能使用下面的程式碼,設定了平行度大小為20。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
李四可能用同樣的方式,設定了這個值為30。要確定專案中使用的是哪個值,需要詢問JVM如何載入類別資訊。
這種方式並不太非常可靠。
一種解決方式
我們可以透過提供外置的forkjoinpool,也就是改變提交方式,來實現不同類型的任務分離。
程式碼如下所示,透過明確的程式碼提交,即可實現任務分離。
ForkJoinPool pool = new ForkJoinPool(30); final long begin = System.currentTimeMillis(); try { pool.submit(() -> numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList())).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
這樣,不同的場景,就可以擁有不同的並行度。手動管理資源是在這種方式和CountDownLatch之間有相似之處,它們有異曲同工之妙。
以上是ParallelStream使用的坑洞怎麼解決的詳細內容。更多資訊請關注PHP中文網其他相關文章!