실행자는 작업 제출과 작업 실행 전략을 분리할 수 있습니다.
동일한 유형이고 실행 시간의 차이가 거의 없는 작업만 최대 성능을 얻을 수 있습니다. -스레드 풀에서 작업을 소비하고 시간이 짧은 작업을 수행하는 경우 스레드 풀이 매우 크지 않으면 교착 상태 및 기타 문제가 발생합니다
다음과 유사합니다. 단일 스레드 풀이 주어지면, 두 작업은 서로 종속되어 있습니다. 한 작업이 다른 작업을 기다리면 교착 상태가 발생합니다.
정의: 작업은 풀에 있는 다른 작업의 실행 결과를 기다려야 합니다. 기아 상태가 발생합니다. 교착 상태가 발생할 수 있습니다
참고: 스레드 풀의 크기에는 다른 리소스 풀과 같은 다른 제한 사항도 적용됩니다. 데이터베이스 연결 풀
각 작업이 연결인 경우 thread 풀의 크기는 데이터베이스 연결 풀의 크기에 따라 달라집니다
Instance:
1. Executors의 팩토리 메소드를 통해 일부 기본 구현을 반환합니다.
2. By ThreadPoolExecutor 인스턴스화 (.. ...)
스레드 풀의 대기열
사용자 정의 구현 1. 무제한 대기열: 작업이 도착하고 스레드 풀이 가득 차면 작업은 대기열에서 대기합니다. 무한히, 큐는 무한히 확장됩니다
예: 싱글톤 및 고정 크기 스레드 풀이 사용하는 것입니다
2. 제한된 큐: 새 작업이 도착하고 큐가 가득 차면 포화 전략
을 사용합니다.3. 동기식 핸드오버: 스레드 풀이 큰 경우 작업을 대기열에 넣은 후 핸드오버가 지연됩니다. 작업 생산자가 작업을 빠르게 대기열에 추가하면
SynchronousQueue가 직접 작업을 작업자 스레드에 넘겨주세요
메커니즘: 작업을 넣습니다. 수락 대기 중인 스레드가 있어야 합니다. 그렇지 않은 경우 추가스레드, 스레드가 포화되면 작업을 거부합니다
예: CacheThreadPool은 사용된 전략입니다
포화 전략:
setRejectedExecutionHandler로 포화 전략을 수정
1. 종료 Abort(기본값): 예외 발생 처리자 호출자
2. 폐기 Discard
3. 폐기 DiscardOldest: 폐기 가장 오래된 작업, 참고: priority인 경우 대기열은 우선 순위가 가장 높은 작업
4.CallerRuns을 포기합니다. : 작업을 롤백하면 호출자 스레드가 자체적으로 작업을 처리합니다
스레드가 생성될 때마다 실제로 스레드 팩토리를 호출하여 완료합니다.
사용자 정의 스레드 팩토리: ThreadFactory를 구현합니다.
사용자 정의할 수 있습니다. 스레드 팩토리의 동작: Uncaught Exception Handler 등
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
time
Exception이 발생하면 메소드가 실행되지 않음2. before
Execute: 시작 전 RuntimeException이 발생하면 작업이 실행되지 않음3.terminating : 스레드 풀이 닫히면 리소스 해제 등에 사용할 수 있습니다.
2.recursive
알고리즘//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2. Iteration
각 반복 작업이 서로 독립적인 경우 직렬 실행이 가능합니다
예: 깊이 우선search
알고리즘; 여전히 직렬이지만 각 노드의 계산은 병렬입니다//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
위 내용은 Java 스레드 풀 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!