背景: 隨著電腦硬體的升級換代,讓我們的軟體具備多執行緒執行任務的能力。當我們在進行多線程編程時,就需要創建線程,如果說程序並發很高的話,我們會創建大量的線程,而每個線程執行一個時間很短的任務就結束了,這樣頻繁創建線程,會極大的降低系統效能,增加伺服器開銷,因為建立執行緒和銷毀執行緒都需要額外的消耗。
這時我們就可以藉助池化技術,來優化這個缺陷,線程池就誕生了。
池化技術的本質是在高並發場景下,為了實現資源復用,減少資源創建銷毀等開銷,如果並發數很小沒有明顯優勢(資源一直佔用系統內存,沒有機會被使用)。
池化技術介紹: 什麼時池化技術呢?池化技術是一種編程技巧,當程序出現高並發時,能夠明顯的優化程序,降低系統頻繁創建銷毀連接等額外開銷。我們經常接觸到的池化技術有資料庫連線池、執行緒池、物件池等等。池化技術的特點是將一些高成本的資源維護在一個特定的池子(記憶體)中,規定其最小連線數、最大連線數、阻塞佇列,溢位規則等配置,方便統一管理。一般情況下也會附帶一些監控,強制回收等配套功能。
池化技術作為一種資源使用技術,典型的使用情形是:
取得資源的成本較高的時候
請求資源的頻率很高且使用資源總數較低的時候
#面對效能問題,涉及到處理時間延遲的時候
池化技術資源分類:
#系統呼叫的系統資源,如執行緒、進程、記憶體分配等
網路通訊的遠端資源, 如資料庫連線、套接字連線等
#線程池是我們為了規避創建線程,銷毀線程額外開銷而誕生的,所以說我們定義創建好線程池之後,就不需要自己來創建線程,而是使用線程池調用執行我們的任務。下面我們一起來看看如何定義並建立線程池。
建立線程池可以使用Executors,其中提供了一系列工廠方法來建立線程池,傳回的線程池都實作了ExecutorService介面。
ExecutorService 介面是Executor介面的子類別接口,使用更為廣泛,其提供了線程池生命週期管理的方法,傳回 Future 物件。
也就是說我們透過Executors建立執行緒池,得到ExecutorService
,透過ExecutorService
執行非同步任務(實作Runnable介面)
Executors 可以建立幾個類型的執行緒池:
#newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池執行緒數量過剩,會在60秒後回收多餘執行緒資源,當任務書增加,執行緒不夠用,則會新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大並發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor
建立一個單一執行緒的執行緒池,只使用唯一的執行緒來執行任務,可以保證任務會依照提交順序來完成。
在阿里巴巴開發規範中,規定執行緒池不允許透過Executors創建,而是透過ThreadPoolExecutor創建。
好處:讓寫的同學可以更明確線程池的運作規則,並規避資源耗盡的風險。
ThreadPoolExecutor的七大參數:
(1)corePoolSize
核心執行緒數量,核心執行緒會一直保留,不會被銷毀。
(2)maximumPoolSize
最大執行緒數,當核心執行緒無法滿足任務需求時,系統就會建立新的執行緒來執行任務。
(3)keepAliveTime
存活時間,核心執行緒以外的執行緒空閒多久就會被銷毀。
(4)timeUnit
代表執行緒存活的時間單位。
(5)BlockingQueue
阻塞佇列
#如果正在執行的任務超過了最大執行緒數,可以存放在佇列中,當執行緒池中有空閒資源就可以從佇列中取出任務繼續執行。
佇列類型有以下幾種類型:LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue。
(6)threadFactory
線程工廠,用來創建線程的,可以自訂線程,例如我們可以定義線程組名稱,在jstack問題排查時,非常有幫助。
(7)rejectedExecutionHandler
拒絕策略,
當所有執行緒(最大執行緒數)都在忙,且任務佇列處於滿任務的狀態,則會執行拒絕策略。
JDK為我們提供了四種拒絕策略,我們必須都得熟悉
#AbortPolicy: 丟棄任務,並拋出異常RejectedExecutionException。 預設
DiscardPolicy: 丟棄最新的任務,不拋例外。
DiscardOldestPolicy: 丟掉排隊時間最久的任務,也就是最舊的任務。
CallerRuns: 由呼叫者(提交非同步任務的執行緒)處理任務。
想要實作一個執行緒池我們就需要關心ThreadPoolExecutor類,因為Executors建立執行緒池也是透過new ThreadPoolExecutor物件。
看一下ThreadPoolExecutor
的類別繼承關係,可以看出為什麼透過Executors
建立的執行緒池傳回結果是ExecutorService,因為ThreadPoolExecutor是ExecutorService介面的實作類別,而Executors創建線程池本質也是創建的ThreadPoolExecutor 物件。
下面我們一起看一下ThreadPoolExecutor
的原始碼,首先是ThreadPoolExecutor
內定義的變數,常數:
// 复合类型变量 是一个原子整数 控制状态(运行状态|线程池活跃线程数量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 容量 // 运行状态存储在高位3位 private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务,并处理队列任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,但会处理队列任务 private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不会处理队列任务,中断正在处理的任务 private static final int TIDYING = 2 << COUNT_BITS; // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状 态,将会执行terminated()钩子方法 private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已经完成 // 设置 ctl 参数方法 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } /** * 阻塞队列 */ private final BlockingQueue<Runnable> workQueue; /** * Lock 锁. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 工人们 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 等待条件支持等待终止 */ private final Condition termination = mainLock.newCondition(); /** * 最大的池大小. */ private int largestPoolSize; /** * 完成任务数 */ private long completedTaskCount; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略 */ private volatile RejectedExecutionHandler handler; /** * 存活时间 */ private volatile long keepAliveTime; /** * 允许核心线程数 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数 */ private volatile int corePoolSize; /** * 最大线程数 */ private volatile int maximumPoolSize; /** * 默认拒绝策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * shutdown and shutdownNow权限 */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
建構器,,支援最少五種參數,最大七中參數的四種建構器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
工人,在執行緒池中執行任務的,執行緒池就是透過這些工人進行工作的,有核心員工(核心線程)和臨時工(人手不夠的時候,臨時創建的,如果空閒時間廠,就會被裁員),
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 工人的本质就是个线程 final Thread thread; // 第一件工作任务 Runnable firstTask; volatile long completedTasks; /** * 构造器 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 工作 */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
核心方法,通過執行緒池執行任務(這也是執行緒池的運作原理):
檢驗任務
取得目前執行緒池狀態
判斷上班工人數是否小於核心員工數
如果小於則招人,安排工作
#不小於則判斷等候區任務是否排滿
如果沒有排滿則任務排入等候區
如果排滿,看是否允許招人,允許招人則招臨時工
如果都不行,該線程池無法接收新任務,開始按老闆約定的拒絕策略,執行拒絕策略
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
submit()
方法是其抽象父類別定義的,這裡我們就可以明顯看到submit與execute的區別,透過submit調用,我們會創建RunnableFuture
,並且會傳回Future,這裡我們可以將傳回值類型,告知submit方法,它就會透過泛型約束傳回值。
public abstract class AbstractExecutorService implements ExecutorService { public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ... }
addWorker()是招募人的一個方法:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断状态,及任务列表 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
取得任務的方法:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
#讓員工工作的方法,分配任務,運行任務:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
以上是java線程池實作原理的源碼分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!