예를 들어 아래 코드 조각은 사람들이 시를 읽는 것처럼 읽게 만듭니다. 그러나 제대로 사용하지 않으면 치명적일 수 있습니다.
List<Integer> transactionsIds = widgets.stream() .filter(b -> b.getColor() == RED) .sorted((x,y) -> x.getWeight() - y.getWeight()) .mapToInt(Widget::getWeight) .sum();
이 코드에는 스트림이라는 핵심 기능이 있습니다. 이를 사용하면 일반 목록을 스트림으로 변환한 다음 파이프라인을 사용하여 목록을 처리할 수 있습니다. 대체로 내가 사용해 본 모든 것이 좋다고 말합니다.
이러한 기능이 너무 익숙하지 않으신가요? 다음을 참조하세요. "Map과 flatMap은 어디에나 있는데, 그게 무슨 뜻인가요?"
질문이 다가오고 있습니다
stream을 ParallelStream으로 바꾸면 어떻게 될까요?
에 따르면 문자 그대로의 의미는 스트림이 직렬에서 병렬로 변경된다는 의미입니다.
병렬 상황이라는 점을 고려하면 스레드 안전성 문제가 있을 것이 분명합니다. 그러나 여기서 논의하는 내용은 스레드로부터 안전한 컬렉션을 사용할 필요가 없다는 것입니다. 이 주제는 너무 기본적이기 때문입니다. 오늘날에는 스레드가 안전하지 않은 상황에서 스레드로부터 안전한 컬렉션을 사용하는 방법을 배우는 것이 필수적인 기술이 되었습니다.
이번 함정은 병렬 스트리밍의 성능 문제입니다.
우리는 코드가 말하도록 합니다.
다음 코드는 동시에 8개의 스레드를 활성화하며 모든 스레드는 데이터 계산을 위해 병렬 스트림을 사용합니다. 실행 로직에서는 각 작업을 1초 동안 휴면 상태로 두어 일부 I/O 요청의 시간 소모적인 대기를 시뮬레이션할 수 있습니다.
스트림을 사용하면 프로그램이 30초 후에 반환되지만 프로그램은 병렬 스트림이고 이 제목에 걸맞기 때문에 1초 이상 후에 반환될 것으로 예상됩니다.
테스트 결과 작업이 완료되기까지 오랜 시간 기다린 것으로 나타났습니다.
static void paralleTest() { List<Integer> numbers = Arrays.asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ); final long begin = System.currentTimeMillis(); numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList()); } public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); }
pit
사실 이 코드는 다른 컴퓨터에서 실행될 때 다른 시간이 걸립니다.
병렬이기 때문에 어느 정도의 병렬성이 있어야 합니다. 병렬도가 너무 낮으면 성능을 발휘할 수 없고, 병렬도가 너무 높으면 컨텍스트 전환 시간이 낭비됩니다. 저는 스레드 풀의 다양한 매개변수와 모든 종류의 튜닝을 외우고 있는 많은 시니어 개발자들이 감히 눈을 감고 I/O 집약적인 비즈니스에서 ParallelStream을 사용한다는 사실을 알고 매우 좌절했습니다.
이 정도의 병렬성을 이해하려면 구체적인 구축 방법을 살펴봐야 합니다. ForkJoinPool 클래스에서 이와 같은 코드를 찾으세요.
try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } if (fac == null) { if (System.getSecurityManager() == null) fac = defaultForkJoinWorkerThreadFactory; else // use security-managed default fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP;
다음 매개변수에 의해 병렬도가 제어되는 것을 볼 수 있습니다. 이 매개변수를 얻을 수 없는 경우 기본적으로 CPU 수 - 1의 병렬 처리가 사용됩니다.
이 기능은 컴퓨팅 집약적인 비즈니스를 위해 설계되었음을 알 수 있습니다. 너무 많은 작업을 할당하면 병렬 실행이 직렬과 유사한 효과로 저하됩니다.
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N을 사용하여 초기 크기를 설정하더라도 여전히 문제가 있습니다.
한번 설정되면 병렬성 변수는 final로 설정되며 수정이 금지됩니다. 즉, 위의 매개변수는 한 번만 적용됩니다.
Zhang San은 다음 코드를 사용하여 병렬 처리 크기를 20으로 설정할 수 있습니다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
John Doe도 같은 방식으로 이 값을 30으로 설정했을 수 있습니다. 프로젝트에 어떤 값이 사용되는지 확인하려면 JVM에 클래스 정보를 로드하는 방법을 문의해야 합니다.
이 방법은 그다지 신뢰할 수 없습니다.
솔루션
외부 포크조인 풀 제공, 즉 제출 방법 변경을 통해 다양한 유형의 작업 분리를 달성할 수 있습니다.
코드는 명시적 코드 제출을 통해 작업 분리가 가능합니다.
ForkJoinPool pool = new ForkJoinPool(30); final long begin = System.currentTimeMillis(); try { pool.submit(() -> numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList())).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
이런 식으로 시나리오마다 병렬 처리 수준이 다를 수 있습니다. 이러한 방식의 수동 리소스 관리와 CountDownLatch 간에는 유사점이 있으며 동일한 목적을 가지고 있습니다.
위 내용은 ParallelStream 사용의 함정을 해결하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!