Executor can decouple task submission and task execution strategies
Only if the tasks are of the same type and have different execution times Only when the thread pool is not large can the maximum performance be achieved. Otherwise, if some long-consuming tasks and short-consuming tasks are placed in a thread pool, unless the thread pool is very large, it will cause deadlock and other problems
Similar to: Submit two tasks to a single-thread pool, and the two tasks are dependent on each other. If one task waits for the other task, a deadlock will occur; the performance is that the pool is not enough
Definition: A task must wait for the running results of other tasks in the pool, and starvation deadlock may occur
Note: The size of the thread pool is also subject to other restrictions, such as other resource pools: database connection pool
If each task is a connection, then the size of the thread pool is subject to the database connection pool Size
Instance:
1. Return some default implementations through the factory method of Executors
2. Customize the
thread pool’s queue
by instantiating ThreadPoolExecutor(.....) 1. Unbounded queue: When the task arrives and the thread pool is full, the task waits in the queue. If the task reaches infinitely, the queue will expand infinitely
For example: this is what singletons and fixed-size thread pools use
2. Bounded queue: If a new task arrives and the queue is full, use the saturation strategy
## 3. Synchronous handover: If the thread pool is large, put the task After entering the queue, there will be a delay in the handover. If the task producer is very fast, it will also cause the task to be queued SynchronousQueue will directly hand over the task to the worker thread
Mechanism: When a task is placed, there must be a thread waiting to accept it. If not,adds a thread. If the thread is saturated, the task is rejected
For example:CacheThreadPool is the strategy used
Saturation strategy:
setRejectedExecutionHandl er to modify the saturation strategy
1. TerminateAbort (default):Throw an exceptionHanded by the caller
2.AbandonDiscard
3. DiscardDiscardOldest: Discard the oldest task. Note: If it is priority, the queue will discard the highest priority task
4.CallerRuns: Rollback task, the caller thread handles it by itself
4. Thread factory ThreadFactoy Whenever it is created When threading: In fact, the thread factory is called to complete Custom thread factory: implements ThreadFactory You can customize thebehavior of the thread factory: such as UncaughtException Handler etc.
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
timeException is thrown, the method will not be executed
2.beforeExecute: Before starting, if a RuntimeException is thrown, the task will not be executed
3.terminated: When the thread pool is closed, it can be used to release resources, etc. 2.//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2. Iteration
If each iteration operation is independent of each other, it can be executed serially For example: depth-firstsearch algorithm; Note: recursion is still serial, however, each node The calculation is parallel
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
The above is the detailed content of Use of java thread pool. For more information, please follow other related articles on the PHP Chinese website!