首頁 >Java >java教程 >java線程池實作原理的源碼分析

java線程池實作原理的源碼分析

WBOY
WBOY轉載
2023-05-09 14:16:071527瀏覽

執行緒池的起源

背景: 隨著電腦硬體的升級換代,讓我們的軟體具備多執行緒執行任務的能力。當我們在進行多線程編程時,就需要創建線程,如果說程序並發很高的話,我們會創建大量的線程,而每個線程執行一個時間很短的任務就結束了,這樣頻繁創建線程,會極大的降低系統效能,增加伺服器開銷,因為建立執行緒和銷毀執行緒都需要額外的消耗。

這時我們就可以藉助池化技術,來優化這個缺陷,線程池就誕生了。

池化技術的本質是在高並發場景下,為了實現資源復用,減少資源創建銷毀等開銷,如果並發數很小沒有明顯優勢(資源一直佔用系統內存,沒有機會被使用)。

池化技術介紹: 什麼時池化技術呢?池化技術是一種編程技巧,當程序出現高並發時,能夠明顯的優化程序,降低系統頻繁創建銷毀連接等額外開銷。我們經常接觸到的池化技術有資料庫連線池、執行緒池、物件池等等。池化技術的特點是將一些高成本的資源維護在一個特定的池子(記憶體)中,規定其最小連線數、最大連線數、阻塞佇列,溢位規則等配置,方便統一管理。一般情況下也會附帶一些監控,強制回收等配套功能。

池化技術作為一種資源使用技術,典型的使用情形是:

  • 取得資源的成本較高的時候

  • 請求資源的頻率很高且使用資源總數較低的時候

  • #面對效能問題,涉及到處理時間延遲的時候

池化技術資源分類:

  • #系統呼叫的系統資源,如執行緒、進程、記憶體分配等

  • 網路通訊的遠端資源, 如資料庫連線、套接字連線等

#執行緒池的定義和使用

#線程池是我們為了規避創建線程,銷毀線程額外開銷而誕生的,所以說我們定義創建好線程池之後,就不需要自己來創建線程,而是使用線程池調用執行我們的任務。下面我們一起來看看如何定義並建立線程池。

方案一:Executors(僅做了解,建議使用方案二)

建立線程池可以使用Executors,其中提供了一系列工廠方法來建立線程池,傳回的線程池都實作了ExecutorService介面。

ExecutorService 介面是Executor介面的子類別接口,使用更為廣泛,其提供了線程池生命週期管理的方法,傳回 Future 物件

也就是說我們透過Executors建立執行緒池,得到ExecutorService,透過ExecutorService執行非同步任務(實作Runnable介面)

Executors 可以建立幾個類型的執行緒池:

  • #newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池執行緒數量過剩,會在60秒後回收多餘執行緒資源,當任務書增加,執行緒不夠用,則會新建執行緒。

  • newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大並發數,超出的執行緒會在佇列中等待。

  • newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。

  • newSingleThreadExecutor 建立一個單一執行緒的執行緒池,只使用唯一的執行緒來執行任務,可以保證任務會依照提交順序來完成。

方案二:ThreadPoolExecutor

在阿里巴巴開發規範中,規定執行緒池不允許透過Executors創建,而是透過ThreadPoolExecutor創建。

好處:讓寫的同學可以更明確線程池的運作規則,並規避資源耗盡的風險。

ThreadPoolExecutor的七大參數:

(1)corePoolSize 核心執行緒數量,核心執行緒會一直保留,不會被銷毀。

(2)maximumPoolSize 最大執行緒數,當核心執行緒無法滿足任務需求時,系統就會建立新的執行緒來執行任務。

(3)keepAliveTime 存活時間,核心執行緒以外的執行緒空閒多久就會被銷毀。

(4)timeUnit 代表執行緒存活的時間單位。

(5)BlockingQueue 阻塞佇列

  • #如果正在執行的任務超過了最大執行緒數,可以存放在佇列中,當執行緒池中有空閒資源就可以從佇列中取出任務繼續執行。

  • 佇列類型有以下幾種類型:LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue。

(6)threadFactory 線程工廠,用來創建線程的,可以自訂線程,例如我們可以定義線程組名稱,在jstack問題排查時,非常有幫助。

(7)rejectedExecutionHandler 拒絕策略,

當所有執行緒(最大執行緒數)都在忙,且任務佇列處於滿任務的狀態,則會執行拒絕策略。

JDK為我們提供了四種拒絕策略,我們必須都得熟悉

  • #AbortPolicy: 丟棄任務,並拋出異常RejectedExecutionException。 預設

  • DiscardPolicy: 丟棄最新的任務,不拋例外。

  • DiscardOldestPolicy: 丟掉排隊時間最久的任務,也就是最舊的任務。

  • CallerRuns: 由呼叫者(提交非同步任務的執行緒)處理任務。

線程池的實作原理

想要實作一個執行緒池我們就需要關心ThreadPoolExecutor類,因為Executors建立執行緒池也是透過new ThreadPoolExecutor物件。

看一下ThreadPoolExecutor的類別繼承關係,可以看出為什麼透過Executors建立的執行緒池傳回結果是ExecutorService,因為ThreadPoolExecutor是ExecutorService介面的實作類別,而Executors創建線程池本質也是創建的ThreadPoolExecutor 物件。

java線程池實作原理的源碼分析

下面我們一起看一下ThreadPoolExecutor的原始碼,首先是ThreadPoolExecutor內定義的變數,常數:

    // 复合类型变量 是一个原子整数  控制状态(运行状态|线程池活跃线程数量)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 容量
    // 运行状态存储在高位3位
    private static final int RUNNING    = -1 << COUNT_BITS;  // 接受新任务,并处理队列任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 不接受新任务,但会处理队列任务
    private static final int STOP       =  1 << COUNT_BITS;  // 不接受新任务,不会处理队列任务,中断正在处理的任务
    private static final int TIDYING    =  2 << COUNT_BITS;  // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状       态,将会执行terminated()钩子方法
    private static final int TERMINATED =  3 << COUNT_BITS;  // terminated()方法已经完成
    // 设置 ctl 参数方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    /**
     * 阻塞队列
     */
    private final BlockingQueue<Runnable> workQueue;
    /**
     * Lock 锁.
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * 工人们
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    /**
     * 等待条件支持等待终止
     */
    private final Condition termination = mainLock.newCondition();
    /**
     * 最大的池大小.
     */
    private int largestPoolSize;
    /**
     * 完成任务数
     */
    private long completedTaskCount;
    /**
     * 线程工厂
     */
    private volatile ThreadFactory threadFactory;
    /**
     * 拒绝策略
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 存活时间
     */
    private volatile long keepAliveTime;
    /**
     * 允许核心线程数
     */
    private volatile boolean allowCoreThreadTimeOut;
    /**
     * 核心线程数
     */
    private volatile int corePoolSize;
    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;
    /**
     * 默认拒绝策略
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * shutdown and shutdownNow权限
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

建構器,,支援最少五種參數,最大七中參數的四種建構器:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

工人,在執行緒池中執行任務的,執行緒池就是透過這些工人進行工作的,有核心員工(核心線程)和臨時工(人手不夠的時候,臨時創建的,如果空閒時間廠,就會被裁員),

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 工人的本质就是个线程
        final Thread thread;
        // 第一件工作任务
        Runnable firstTask;
      volatile long completedTasks;
        /**
         * 构造器
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** 工作  */
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

核心方法,通過執行緒池執行任務(這也是執行緒池的運作原理):

  • 檢驗任務

  • 取得目前執行緒池狀態

  • 判斷上班工人數是否小於核心員工數

  • 如果小於則招人,安排工作

  • #不小於則判斷等候區任務是否排滿

  • 如果沒有排滿則任務排入等候區

  • 如果排滿,看是否允許招人,允許招人則招臨時工

  • 如果都不行,該線程池無法接收新任務,開始按老闆約定的拒絕策略,執行拒絕策略

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

submit()方法是其抽象父類別定義的,這裡我們就可以明顯看到submit與execute的區別,透過submit調用,我們會創建RunnableFuture,並且會傳回Future,這裡我們可以將傳回值類型,告知submit方法,它就會透過泛型約束傳回值。

public abstract class AbstractExecutorService implements ExecutorService {
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    ...
}

addWorker()是招募人的一個方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 判断状态,及任务列表
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

取得任務的方法:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

#讓員工工作的方法,分配任務,運行任務:

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

以上是java線程池實作原理的源碼分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除