다음 편집기에서는 Java 동시성프로그래밍_스레드 풀 사용 방법(자세한 설명)에 대한 기사를 제공합니다. 에디터가 꽤 좋다고 생각해서 지금 공유해서 참고용으로 올려보겠습니다. 에디터를 따라 살펴보겠습니다
1. 작업과 실행 전략의 암묵적 결합
Executor는 작업 제출과 작업 실행 전략을 분리할 수 있습니다
작업이 동일한 유형이고 있을 때만 가능합니다. 실행 시간의 차이가 거의 없어도 최대 성능을 얻을 수 있습니다. 그렇지 않으면 오래 걸리는 작업과 짧은 작업이 동일한 스레드 풀에 배치되면 스레드 풀이 매우 크지 않으면 교착 상태와 같은 문제가 발생합니다
1 .Thread starvation 교착 상태
는 다음과 유사합니다. 두 작업을 단일 스레드 풀에 제출하고 두 작업이 서로 의존하며 한 작업이 다른 작업을 기다리면 성능이 저하됩니다. 풀이 충분하지 않다는 것
정의: 작업은 풀에 있는 다른 작업의 실행 결과를 기다려야 하며 기아 교착 상태가 발생할 수 있습니다
2. 스레드 풀 크기
참고: 크기 스레드 풀에는 다른 리소스 풀과 같은 다른 제한 사항도 적용됩니다: 데이터베이스 연결 풀
각 작업이 연결인 경우 스레드 풀의 크기는 데이터베이스 연결 풀의 크기에 따라 제한됩니다
3. ThreadPoolExecutor 스레드 풀을 구성합니다
인스턴스:
1. Executor의 팩토리 메서드를 통해 일부 기본 구현을 반환합니다.
2 ThreadPoolExecutor(....)를 인스턴스화하여 구현을 사용자 정의합니다. 작업이 도착하고 스레드 풀이 가득 차면 작업은 대기열에서 대기합니다. 작업이 무한히 도달하면 대기열은 무한히 확장됩니다
예: 이는 싱글톤 및 고정 크기 스레드 풀에 사용되는 것입니다2 .바운드 큐:
새 작업이 도착하고 큐가 가득 찬 경우 포화 전략
을 사용합니다. 3. 동기식 핸드오버:
스레드 풀이 큰 경우 넣기 후 핸드오버가 지연됩니다. 작업 생성자가 곧 작업을 대기열에 추가하게 되면 SynchronousQueue가 작업을 작업자 스레드에 직접 전달합니다메커니즘: 작업을 넣으면 수락 대기 중인 스레드가 있어야 합니다. 그렇지 않다면
New스레드, 스레드가 포화 상태이면 작업을 거부합니다예: CacheThreadPool이 사용된 전략입니다
setRejectedExecutionHan
dl1. 중단(기본값):예외 발생발신자에 의해 처리됨
2. DiscardOldest: 가장 오래된 작업을 삭제합니다. priority대기열은 가장 높은 작업
4.CallerRuns: 롤백 작업, 호출자 스레드가 자체적으로 처리합니다
4 Thread Factory ThreadFactoy
언제든지. 스레드가 생성됩니다: 실제로는 스레드 팩토리가
완료하는 데 사용됩니다. 사용자 정의 스레드 팩토리: implements ThreadFactory
스레드 팩토리의동작을 사용자 정의할 수 있습니다. UncaughtException과 같은 Handler 등
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
5. ThreadPoolExecutor 확장
사용자 정의 하위 클래스로 재정의할 수 있는 메서드:
1.afterExecute: 종료 후 RuntimeException이 발생하면 메서드가 실행되지 않습니다. 2.beforeExecute: 시작하기 전에 RuntimeException이 발생하면 작업이 실행되지 않습니다.3.terminating: 스레드 풀이 닫히면 리소스 해제 등에 사용할 수 있습니다.
2. recursive 알고리즘
의 병렬화
1.循环
在循环中,每次循环操作都是独立的
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
위 내용은 Java 동시 프로그래밍에서 스레드 풀을 사용하는 방법에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!