搜索
首页Javajava教程一文搞懂Java线程池实现原理

本篇文章给大家带来了关于java的相关知识,其中主要介绍了关于线程池实现原理的相关内容,包括了为什么要使用线程池以及线程池使用的相关内容,下面一起来看一下,希望对大家有帮助。

一文搞懂Java线程池实现原理

推荐学习:《java视频教程

1. 为什么要使用线程池

使用线程池通常由以下两个原因:

  1. 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程。

  2. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

2. 线程池的使用

/**
 * @author 一灯架构
 * @apiNote 线程池示例
 **/
public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 1. 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                3,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
      
        // 2. 往线程池中提交3个任务
        for (int i = 0; i < 3; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构");
            });
        }
      
        // 3. 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

输出结果:

pool-1-thread-2 关注公众号:一灯架构
pool-1-thread-1 关注公众号:一灯架构
pool-1-thread-3 关注公众号:一灯架构

线程池的使用非常简单:

  • 调用new ThreadPoolExecutor()构造方法,指定核心参数,创建线程池。

  • 调用execute()方法提交Runnable任务

  • 使用结束后,调用shutdown()方法,关闭线程池。

再看一下线程池构造方法中核心参数的作用。

3. 线程池核心参数

线程池共有七大核心参数:

参数名称 参数含义
int corePoolSize 核心线程数
int maximumPoolSize 最大线程数
long keepAliveTime 线程存活时间
TimeUnit unit 时间单位
BlockingQueue workQueue 阻塞队列
ThreadFactory threadFactory 线程创建工厂
RejectedExecutionHandler handler 拒绝策略
  • corePoolSize 核心线程数

    当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。

  • maximumPoolSize 最大线程数

    当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。

  • keepAliveTime 线程存活时间

    非核心线程的空闲时间达到了keepAliveTime,将会被回收。

  • TimeUnit 时间单位

    线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:

    TimeUnit.NANOSECONDS(纳秒) TimeUnit.MICROSECONDS(微秒) TimeUnit.MILLISECONDS(毫秒) TimeUnit.SECONDS(秒) TimeUnit.MINUTES(分钟) TimeUnit.HOURS(小时) TimeUnit.DAYS(天)

  • workQueue 阻塞队列

    当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:

    LinkedBlockingQueue(基于链表实现的阻塞队列)

    ArrayBlockingQueue(基于数组实现的阻塞队列)

    SynchronousQueue(只有一个元素的阻塞队列)

    PriorityBlockingQueue(实现了优先级的阻塞队列)

    DelayQueue(实现了延迟功能的阻塞队列)

  • threadFactory 线程创建工厂

    用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。

  • RejectedExecutionHandler 拒绝策略

    当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:

    AbortPolicy(直接终止,抛出异常)

    DiscardPolicy(默默丢弃,不抛出异常)

    DiscardOldestPolicy(丢弃队列中最旧的任务,执行当前任务)

    CallerRunsPolicy(返回给调用者执行)

4. 线程池工作原理

线程池的工作原理,简单理解如下:

流程.jpg

  • 当往线程池中提交任务的时候,会先判断线程池中线程数是否核心线程数,如果小于,会创建核心线程并执行任务。

  • 如果线程数大于核心线程数,会判断阻塞队列是否已满,如果没有满,会把任务添加到阻塞队列中等待调度执行。

  • 如果阻塞队列已满,会判断线程数是否小于最大线程数,如果小于,会继续创建最大线程数并执行任务。

  • 如果线程数大于最大线程数,会执行拒绝策略,然后结束。

5. 线程池源码剖析

5.1 线程池的属性

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 线程个数所占的位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池的最大容量,2^29-1,约5亿个线程
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // 独占锁,用来控制多线程下的并发操作
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作线程的集合
    private final HashSet<Worker> workers = new HashSet<>();
    // 等待条件,用来响应中断
    private final Condition termination = mainLock.newCondition();
    // 是否允许回收核心线程
    private volatile boolean allowCoreThreadTimeOut;
    // 线程数的历史峰值
    private int largestPoolSize;

    /**
     * 以下是线程池的七大核心参数
     */
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private volatile long keepAliveTime;
    private final BlockingQueue<Runnable> workQueue;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;

}

线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。

设计者多聪明,用一个变量存储了两块内容。

5.2 线程池状态

线程池共有5种状态:

状态名称 状态含义 状态作用
RUNNING 运行中 线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。
SHUTDOWN 已关闭 调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。
STOP 已停止 调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。
TIDYING 处理中 所有任务已完成,所有工作线程都已回收,等待调用terminated方法。
TERMINATED 已终止 调用terminated方法后处于该状态,线程池的最终状态。

image-20221031192041121.png

5.3 execute源码

看一下往线程池中提交任务的源码,这是线程池的核心逻辑:

// 往线程池中提交任务
public void execute(Runnable command) {
    // 1. 判断提交的任务是否为null
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 2. 判断线程数是否小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 3. 把任务包装成worker,添加到worker集合中
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 7. 如果添加阻塞队列失败,就创建一个Worker
    else if (!addWorker(command, false))
        // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略
        reject(command);
}

execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:

// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 检查是否允许提交任务
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;
        // 2. 使用死循环保证添加线程成功
        for (; ; ) {
            int wc = workerCountOf(c);
            // 3. 校验线程数是否超过容量限制
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 4. 使用CAS修改线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            // 5. 如果线程池状态变了,则从头再来
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 6. 把任务和新线程包装成一个worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 7. 加锁,控制并发
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 8. 再次校验线程池状态是否异常
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    // 9. 如果线程已经启动,就抛出异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 10. 添加到worker集合中
                    workers.add(w);
                    int s = workers.size();
                    // 11. 记录线程数历史峰值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 12. 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

5.4 worker源码

再看一下worker类的结构:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    // 工作线程
    final Thread thread;
    // 任务
    Runnable firstTask;

    // 创建worker,并创建一个新线程(用来执行任务)
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

5.5 runWorker源码

再看一下run方法的源码:

// 线程执行入口
public void run() {
    runWorker(this);
}

// 线程运行核心方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            // 加锁,保证thread不被其他线程中断(除非线程池被中断)
            w.lock();
            // 2. 校验线程池状态,是否需要中断当前线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 3. 执行run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 4. 从worker集合删除当前worker
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。

再看一下从阻塞队列中拉取任务的逻辑:

// 从阻塞队列中拉取任务
private Runnable getTask() {
    boolean timedOut = false;
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 2. 再次判断是否需要回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 3. 从阻塞队列中拉取任务
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

推荐学习:《java视频教程

以上是一文搞懂Java线程池实现原理的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:掘金。如有侵权,请联系admin@php.cn删除
浅析server安装宝塔后出现不能远程的问题浅析server安装宝塔后出现不能远程的问题Nov 23, 2022 pm 04:56 PM

本文由宝塔面板教程栏目给大家介绍关于server2022安装宝塔后出现不能远程的问题,不知道大家有没有遇到这样的问题呢?下面就带大家一起来看看我是怎么处理的吧!

图文详解Node V8引擎的内存和GC图文详解Node V8引擎的内存和GCMar 29, 2023 pm 06:02 PM

本篇文章带大家深入了解NodeJS V8引擎的内存和垃圾回收器(GC),希望对大家有所帮助!

一文浅析Golang中的闭包一文浅析Golang中的闭包Nov 21, 2022 pm 08:36 PM

闭包(closure)是一个函数以及其捆绑的周边环境状态(lexical environment,词法环境)的引用的组合。 换而言之,闭包让开发者可以从内部函数访问外部函数的作用域。 闭包会随着函数的创建而被同时创建。

MySQL中怎么进行大文本存储压缩MySQL中怎么进行大文本存储压缩Feb 02, 2023 pm 08:23 PM

对MySQL大文本数据存储进行简单的调研,通过牺牲部分CPU资源对数据压缩,使数据占用更小的空间,从而减少磁盘I/O和网络I/O

count(*)为什么很慢?原因分析count(*)为什么很慢?原因分析Jan 05, 2023 pm 09:21 PM

count(*)为什么很慢?下面本篇文章就来给大家分析一下原因,并聊聊count(*)的执行过程,希望对大家有所帮助!

了解一下Golang中的unsafe包了解一下Golang中的unsafe包Apr 02, 2023 am 08:30 AM

在一些底层的库中, 经常会看到使用 unsafe 包的地方。本篇文章就来带大家了解一下Golang中的unsafe包,介绍一下unsafe 包的作用和Pointer的使用方式,希望对大家有所帮助!

用三行代码使你的git提交记录变干净用三行代码使你的git提交记录变干净Feb 28, 2023 pm 04:19 PM

本篇文章给大家带来了关于git的相关知识,其中主要跟大家聊一聊怎么让你的git记录保持整洁,感兴趣的朋友下面一起来看一下吧,希望对大家有帮助。

Node学习之如何最小化堆分配和防止内存泄漏Node学习之如何最小化堆分配和防止内存泄漏Jan 11, 2023 pm 08:25 PM

Node.js如何查看内存泄漏?下面本篇文章带大家了解Nodejs堆分配,介绍一下如何最小化堆分配和防止内存泄漏,希望对大家有所帮助!

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
2 周前By尊渡假赌尊渡假赌尊渡假赌
仓库:如何复兴队友
4 周前By尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒险:如何获得巨型种子
4 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器