Home >Java >javaTutorial >Analysis of Java Thread Pool Execution Principle

Analysis of Java Thread Pool Execution Principle

2017-02-23 10:42:441375browse

The previous article has analyzed the creation of the thread pool. It is understood that the thread pool has both preset templates and a variety of parameters to support flexible customization.

This article will focus on the life cycle of the thread pool and analyze the process of the thread pool executing tasks.

Thread pool status

First understand the two parameters that run through the thread pool code:

runState: thread pool running status

workerCount: The number of worker threads

The thread pool uses a 32-bit int to save runState and workerCount at the same time, of which the upper 3 bits are runState and the remaining 29 bits are workerCount. RunStateOf and workerCountOf are used repeatedly in the code to obtain runState and workerCount.

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 线程池状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// ctl操作
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

RUNNING: Can receive new tasks, can execute tasks in the waiting queue

SHUTDOWN: Cannot receive new tasks, can execute tasks in the waiting queue

STOP: Cannot receive new tasks, cannot execute tasks in the waiting queue, and try to terminate all running tasks

TIDYING: All tasks have been terminated, execute terminated()

TERMINATED: The execution of terminated() is completed

The thread pool status starts from RUNNING by default and ends in the TERMINATED status. There is no need to go through each status in the middle, but the status cannot be rolled back. The following are possible paths and change conditions for state changes:

Analysis of Java Thread Pool Execution Principle
##Figure 1 Thread pool state change path

Creation of Worker

The thread pool is responsible for executing tasks by the Worker class. Worker inherits AbstractQueuedSynchronizer, which leads to the core AQS of the Java concurrency framework.


AQS will not discuss it here. You only need to know that Worker wraps Thread and it performs tasks.

Calling execute will create a Worker according to the situation of the thread pool. The following four situations can be summarized:

Analysis of Java Thread Pool Execution Principle
Figure 2 Worker in the thread pool Four possibilities

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
        c = ctl.get();
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    else if (!addWorker(command, false))

Mark 1 corresponds to the first situation. Please note that addWorker passes in core, core=true is corePoolSize, core=false is maximumPoolSize. When adding, you need to check whether workerCount exceeds the allowed value. the maximum value.

Mark 2 corresponds to the second situation, check whether the thread pool is running, and add the task to the waiting queue. Mark 3 checks the thread pool status again. If the thread pool is suddenly in a non-running state, delete the task just added to the waiting queue and hand it over to the RejectedExecutionHandler for processing. Mark 4 finds that there is no worker, so it first adds a worker with an empty task.

Mark 5 corresponds to the third situation. No more tasks can be added to the waiting queue. Call addWorker to add one for processing.

Mark 6 corresponds to the fourth situation. The core of addWorker is passed in false and the return call fails, which means that the workerCount has exceeded the maximumPoolSize, so it is handed over to RejectedExecutionHandler for processing.

private boolean addWorker(Runnable firstTask, boolean core) {
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                } finally {
                if (workerAdded) {
                    workerStarted = true;
        } finally {
            if (! workerStarted)
        return workerStarted;

The first piece of code marked 1 has a simple purpose, which is to add one to workerCount. As for why the code took so long to write, it is because the status of the thread pool is constantly changing, and the synchronization of variables needs to be ensured in a concurrent environment. The outer loop determines the thread pool status, the task is not empty, and the queue is not empty. The inner loop uses the CAS mechanism to ensure that workerCount is incremented correctly. If you don’t know CAS, you can learn about the non-blocking synchronization mechanism CAS. CAS will be used for subsequent increases or decreases in workerCount.

The second code marked 2 is relatively simple. Create a new Worker object and add the Worker to the workers (Set collection). After successfully adding, start the thread in the worker. In finally, it is judged whether the thread is started successfully. If it is not successful, addWorkerFailed is called directly.

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        try {
            if (w != null)
        } finally {

addWorkerFailed will reduce the already incremented workerCount and call tryTerminate to end the thread pool.

Execution of Worker

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
public void run() {

Worker uses ThreadFactory to create Thread in the constructor, and calls runWorker in the run method, which seems to be the place where the task is actually executed.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
            } finally {
                task = null;
        completedAbruptly = false;       //5
    } finally {
        processWorkerExit(w, completedAbruptly);

Mark 1 enters the loop and gets the task to be executed from getTask until null is returned. The effect of thread reuse is achieved here, allowing threads to handle multiple tasks.

Mark 2 is a more complex judgment, which ensures that the thread pool is interrupted in the STOP state, and the thread is not interrupted in the non-STOP state. If you don’t understand Java’s interrupt mechanism, read this article on how to end a Java thread correctly.

Mark 3 calls the run method and actually executes the task. Two methods, beforeExecute and afterExecute, are provided before and after execution, which are implemented by subclasses.

The completedTasks in mark 4 counts how many tasks the worker has executed, and is finally added to the completedTaskCount variable. You can call the corresponding method to return some statistical information.

The variable completedAbruptly marked 5 indicates whether the worker terminated abnormally. Execution here means normal execution. Subsequent methods require this variable.

Mark 6 ends by calling processWorkerExit, which will be analyzed later.

Then let’s look at the getTask method of the worker getting the task from the waiting queue:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            return null;
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;





private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusted
    final ReentrantLock mainLock = this.mainLock;
    try {
        completedTaskCount += w.completedTasks;
    } finally {
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        addWorker(null, false);










public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    try {
        checkShutdownAccess();   //1 安全策略机制
        advanceRunState(SHUTDOWN);   //2
        interruptIdleWorkers();   //3
        onShutdown(); //4 空方法,子类实现
    } finally {
    tryTerminate();   //5


public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    try {
        tasks = drainQueue();  //1
    } finally {
    return tasks;




final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
        if (workerCountOf(c) != 0) { // Eligible to terminate
        final ReentrantLock mainLock = this.mainLock;
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
        } finally {
        // else retry on failed CAS








public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
    } finally {


 以上就是Java 线程池执行原理分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn