The following editor will bring you an article about java concurrencyProgramming_How to use thread pool (detailed explanation). The editor thinks it’s pretty good, so I’ll share it with you now and give it as a reference. Let’s follow the editor and take a look.
1. The implicit coupling between tasks and execution strategies
Executor can submit the task Decoupled from the execution strategy of the task
Only when the tasks are of the same type and the execution time is not much different, the maximum performance can be achieved. Otherwise, for example, put some long-consuming tasks and short-consuming tasks in the same thread. Pool, unless the thread pool is very large, it will cause deadlock and other problems
1. Thread starvation deadlock
is similar to: combining two A task is submitted to a single-thread pool, and the two tasks are dependent on each other. If one task waits for another task, a deadlock will occur; the performance is that the pool is not enough
Definition: a task must wait for the pool As a result of running other tasks, starvation deadlock may occur
2. Thread pool size
Note: The size of the thread pool is also subject to other restrictions, such as other resource pools: database connection pool
If each task is a connection, then the size of the thread pool is subject to the size of the database connection pool
3. Configure the ThreadPoolExecutor thread pool
Example:
1. Return through the factory method of Executors Some default implementations
2. Customize the
thread pool’s queue## by instantiating ThreadPoolExecutor(.....)
#1. Unbounded queue: When the task arrives and the thread pool is full, the task waits in the queue. If the task reaches infinitely, the queue will expand infinitely
For example: Singletons and fixed-size thread pools use this2. Bounded queue: If a new task arrives and the queue is full, use Saturation Strategy
3. Synchronous handover: If the thread pool is large, there will be a delay in handover after putting the task into the queue. If the task producer soon will also cause the task to be queued
SynchronousQueue will hand over the task directly to the worker thread
Mechanism: to put a task in, there must be a The thread waits for acceptance. If not,adds a thread. If the thread is saturated, the task is rejected
For example:CacheThreadPool is the strategy used
Saturation strategy:
setRejectedExecutionHandler to modify the saturation strategy
1. Terminate Abort (default): Throws an exception Handled by the caller
2. Discard Discard
3. DiscardOldest : Abandon the oldest task. Note: If it is priority the queue will discard the highest priority task
4.CallerRuns: The rollback task is handled by the caller thread itself
4. Thread factory ThreadFactoy
Whenever a thread is created : In fact, the thread factory is called to complete
Custom thread factory: implements ThreadFactory
You can customize the behavior of the thread factory : Such as UncaughtExceptionHandler etc.
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
5. Extended ThreadPoolExecutor
can be automatically Define methods covered by subclasses: 1.afterExecute: After the end, if a RuntimeException is thrown, the method will not be executed
2.beforeExecute: Before starting, if a RuntimeException is thrown, the task will not be executed
3.terminated: When the thread pool is closed, it can be used to release resources, etc.2. Parallelization of recursivealgorithm
1.循环
在循环中,每次循环操作都是独立的
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
The above is the detailed content of Detailed explanation of how to use thread pool in java concurrent programming. For more information, please follow other related articles on the PHP Chinese website!