Heim >Java >javaLernprogramm >Verstehen Sie das Implementierungsprinzip des Java-Thread-Pools in einem Artikel
Dieser Artikel vermittelt Ihnen relevantes Wissen über Java. Er stellt hauptsächlich die relevanten Inhalte zum Implementierungsprinzip des Thread-Pools vor, einschließlich der Gründe für die Verwendung des Thread-Pools und verwandter Inhalte zur Thread-Pool-Nutzung hilft allen.
Empfohlene Studie: „Java-Video-Tutorial“
Threads werden häufig erstellt und zerstört verbraucht Systemressourcen, Threads können mithilfe von Thread-Pools wiederverwendet werden.
/** * @author 一灯架构 * @apiNote 线程池示例 **/ public class ThreadPoolDemo { public static void main(String[] args) { // 1. 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 2. 往线程池中提交3个任务 for (int i = 0; i { System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构"); }); } // 3. 关闭线程池 threadPoolExecutor.shutdown(); } }
pool-1-thread-2 关注公众号:一灯架构 pool-1-thread-1 关注公众号:一灯架构 pool-1-thread-3 关注公众号:一灯架构
int MaximumPoolSize | |||||||||||||||||||
long keepAliveTime | |||||||||||||||||||
TimeUnit-Einheit | |||||||||||||||||||
BlockingQueue.workQueue | |||||||||||||||||||
ThreadFactory threadFactory | |||||||||||||||||||
RejectedExecutionHandler-Handler | |||||||||||||||||||
状态名称 | 状态含义 | 状态作用 |
---|---|---|
RUNNING | 运行中 | 线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。 |
SHUTDOWN | 已关闭 | 调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。 |
STOP | 已停止 | 调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。 |
TIDYING | 处理中 | 所有任务已完成,所有工作线程都已回收,等待调用terminated方法。 |
TERMINATED | 已终止 | 调用terminated方法后处于该状态,线程池的最终状态。 |
看一下往线程池中提交任务的源码,这是线程池的核心逻辑:
// 往线程池中提交任务 public void execute(Runnable command) { // 1. 判断提交的任务是否为null if (command == null) throw new NullPointerException(); int c = ctl.get(); // 2. 判断线程数是否小于核心线程数 if (workerCountOf(c) <p>execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:</p><pre class="brush:php;toolbar:false">// 添加worker private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 检查是否允许提交任务 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 2. 使用死循环保证添加线程成功 for (; ; ) { int wc = workerCountOf(c); // 3. 校验线程数是否超过容量限制 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 4. 使用CAS修改线程数 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 5. 如果线程池状态变了,则从头再来 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 6. 把任务和新线程包装成一个worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 7. 加锁,控制并发 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 8. 再次校验线程池状态是否异常 int rs = runStateOf(ctl.get()); if (rs largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 12. 启动线程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。
再看一下worker类的结构:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工作线程 final Thread thread; // 任务 Runnable firstTask; // 创建worker,并创建一个新线程(用来执行任务) Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } }
再看一下run方法的源码:
// 线程执行入口 public void run() { runWorker(this); } // 线程运行核心方法 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { // 加锁,保证thread不被其他线程中断(除非线程池被中断) w.lock(); // 2. 校验线程池状态,是否需要中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 3. 执行run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 解锁 w.unlock(); } } completedAbruptly = false; } finally { // 4. 从worker集合删除当前worker processWorkerExit(w, completedAbruptly); } }
runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。
再看一下从阻塞队列中拉取任务的逻辑:
// 从阻塞队列中拉取任务 private Runnable getTask() { boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 2. 再次判断是否需要回收线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 3. 从阻塞队列中拉取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
推荐学习:《java视频教程》
Das obige ist der detaillierte Inhalt vonVerstehen Sie das Implementierungsprinzip des Java-Thread-Pools in einem Artikel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!