Home >Java >javaTutorial >Use of java thread pool

Use of java thread pool

大家讲道理
大家讲道理Original
2017-05-28 11:33:171629browse

1. Implicit coupling between tasks and execution strategies

Executor can decouple task submission and task execution strategies

Only if the tasks are of the same type and have different execution times Only when the thread pool is not large can the maximum performance be achieved. Otherwise, if some long-consuming tasks and short-consuming tasks are placed in a thread pool, unless the thread pool is very large, it will cause deadlock and other problems

1. Thread starvation deadlock

Similar to: Submit two tasks to a single-thread pool, and the two tasks are dependent on each other. If one task waits for the other task, a deadlock will occur; the performance is that the pool is not enough

Definition: A task must wait for the running results of other tasks in the pool, and starvation deadlock may occur

2. Thread pool size

 

Note: The size of the thread pool is also subject to other restrictions, such as other resource pools: database connection pool

If each task is a connection, then the size of the thread pool is subject to the database connection pool Size

3. Configure ThreadPoolExecutor thread pool

Instance:

1. Return some default implementations through the factory method of Executors

2. Customize the

thread pool’s queue

by instantiating ThreadPoolExecutor(.....) 1. Unbounded queue: When the task arrives and the thread pool is full, the task waits in the queue. If the task reaches infinitely, the queue will expand infinitely

For example: this is what singletons and fixed-size thread pools use

2. Bounded queue: If a new task arrives and the queue is full, use the saturation strategy

## 3. Synchronous handover: If the thread pool is large, put the task After entering the queue, there will be a delay in the handover. If the task producer is very fast, it will also cause the task to be queued

Syn

chronousQueue will directly hand over the task to the worker thread

Mechanism: When a task is placed, there must be a thread waiting to accept it. If not,

adds a thread. If the thread is saturated, the task is rejected

 For example:

CacheThreadPool is the strategy used

Saturation strategy:

 

setRejectedExecutionHandl er to modify the saturation strategy

 1. Terminate

Abort (default):Throw an exceptionHanded by the caller

 2.Abandon

Discard

3. Discard

DiscardOldest: Discard the oldest task. Note: If it is priority, the queue will discard the highest priority task

 4.

CallerRuns: Rollback task, the caller thread handles it by itself

4. Thread factory ThreadFactoy

Whenever it is created When threading: In fact, the thread factory is called to complete

Custom thread factory: implements ThreadFactory

You can customize the

behavior of the thread factory: such as UncaughtException Handler etc.

 


public class MyAppThread extends Thread {    public static final String DEFAULT_NAME = "MyAppThread";    private static volatile boolean debugLifecycle = false;    private static final AtomicInteger created = new AtomicInteger();    private static final AtomicInteger alive = new AtomicInteger();    private static final Logger log = Logger.getAnonymousLogger();    public MyAppThread(Runnable r) {        this(r, DEFAULT_NAME);
    }    public MyAppThread(Runnable runnable, String name) {        super(runnable, name + "-" + created.incrementAndGet());        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }    public void run() {        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;        if (debug) log.log(Level.FINE, "Created " + getName());        try {
            alive.incrementAndGet();            super.run();
        } finally {
            alive.decrementAndGet();            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }    public static int getThreadsCreated() {        return created.get();
    }    public static int getThreadsAlive() {        return alive.get();
    }    public static boolean getDebug() {        return debugLifecycle;
    }    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}


5. Extend ThreadPoolExecutor

Methods that can be overridden by custom subclasses:

1.afterExecute: After the end, if a Run

timeException is thrown, the method will not be executed

2.

beforeExecute: Before starting, if a RuntimeException is thrown, the task will not be executed

 3.terminated: When the thread pool is closed, it can be used to release resources, etc.

2.

RecursionParallelization of algorithm

1.Loop 

In the loop, each Each loop operation is independent


//串行化
    void processSequentially(List<Element> elements) {        for (Element e : elements)
            process(e);
    }    //并行化
    void processInParallel(Executor exec, List<Element> elements) {        for (final Element e : elements)
            exec.execute(new Runnable() {                public void run() {
                    process(e);
                }
            });
    }


2. Iteration

If each iteration operation is independent of each other, it can be executed serially

For example: depth-first

search algorithm; Note: recursion is still serial, however, each node The calculation is parallel

 


//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);        return resultQueue;
    }


 

  实例:

  


public class ConcurrentPuzzleSolver <P, M> {    private final Puzzle<P, M> puzzle;    private final ExecutorService exec;    private final ConcurrentMap<P, Boolean> seen;    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {        this.puzzle = puzzle;        this.exec = initThreadPool();        this.seen = new ConcurrentHashMap<P, Boolean>();        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }    private ExecutorService initThreadPool() {        return Executors.newCachedThreadPool();
    }    public List<M> solve() throws InterruptedException {        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池        }
    }    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {        return new SolverTask(p, m, n);
    }    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {            super(pos, move, prev);
        }        public void run() {            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){                return; // already solved or seen this position            }            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}


 

  

 

The above is the detailed content of Use of java thread pool. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:Java NIO: I/O modelNext article:Java NIO: I/O model