搜尋
首頁Javajava教程Java執行緒池execute()方法怎麼用

先理解線程池到底有什麼作用

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

線程池處理了兩個不同的問題,線程池通過減少線程正式調用之前的開銷來給大量異步任務更優秀的表現,與此同時給出了一系列綁定管理任務執行緒的一種手段。每個執行緒池都包含了一些基本訊息,例如內部完成的任務數量。

先看ThreadPoolExecutor類別的一系列代表狀態的

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl作為AtomicInteger類別存放了類別中的兩種信息,在其中由高3位元來保存線程池的狀態,後29位元來保存此時執行緒池中的Woker類別執行緒數量(由此可知,執行緒池中的執行緒數量最高可以接受大約在五億左右)。由此可見給出的runStateOf()和workerCountOf()方法分別給出了查看執行緒狀態和執行緒數量的方法。

該類別一共給了五種狀態

讓我們看作者給的註解

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don&#39;t accept new tasks, but process queued tasks
*   STOP:     Don&#39;t accept new tasks, don&#39;t process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • RUNNING狀態可以接受新進的任務,同時也會執行佇列裡的任務。

  • SHUTDOWN 狀態已經不會再接受新任務,但仍舊會處理佇列中的任務。

  • STOP狀態在先前的基礎上,不會處理佇列中的人物,在執行的任務也會直接被打斷。

  • TIDYING狀態在先前的基礎上,所有任務都已經終止,池中的Worker執行緒都已經為0,也就是stop狀態在清理完所有工作執行緒之後就會進入該狀態,同時在shutdown狀態在佇列空以及工作執行緒清理完畢之後也會直接進入這個階段,這階段會循環執行terminated()方法。

  • TERMINATED 狀態作為最後的狀態,在先前的基礎上terminated()方法也業已執行完畢,才會從上個狀態進入這個狀態,代表線程池已經完全停止。

由於執行緒池的狀態都是透過AtomicInteger來保存的,可以透過比較的方式簡單的得到當前執行緒狀態。

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

接下來是線程池的幾個有關工作線程的變數

  • #corePoolSize表示在執行緒池中允許存活最少的工作執行緒數量,但值得注意的是如果allowCoreThreadTimeOut一旦設定true(預設false),每個執行緒的存活時間只有keepAliveTime也就是說在allowCoreThreadTimeOut為true的時候,該執行緒池最小的工作執行緒數量為0;maximumPoolSize代表執行緒池中最大的工作執行緒數量。

  • keepAliveTime當執行緒池中工作執行緒數大於corePoolSize時,每個工作執行緒的在等待工作時最長的等待時間。

  • workQueue作為執行緒池的任務等待佇列,這個會在接下來的execute()中詳細解釋。

  • Workers作為存放執行緒池中存放工作執行緒的容器。

  • largestPoolSize用來記錄執行緒池中存在過的最大的工作執行緒數量。

  • completedTaskCount用來記錄執行緒池完成的任務的總數。

  • Handler作為執行緒池中在不能接受任務的時候的拒絕策略,我們可以實作自己的拒絕策略,在實作了RejectedExecutionHandler介面的前提下。下面是執行緒池的預設拒絕策略,

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

threadFactory作為執行緒池生產執行緒的工廠類別

下面是執行緒池預設的執行緒工廠的生產執行緒方法

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

我們可以先看我們最常呼叫的execute()方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()內部的呼叫邏輯非常清晰。

如果目前執行緒池的工作執行緒數量小於corePoolSize,那麼直接呼叫addWoker(),來新增工作執行緒。

下面是addWorker()的具體方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這段方法比較長,但整體的邏輯還是清晰的。

先判斷目前執行緒池的狀態,如果已經狀態不是shutdown或running,或是已經為shutdown但是工作佇列已經為空,那麼這個時候就直接回傳新增工作失敗。接下來是對線程池線程數量的判斷,根據呼叫時的core的值來判斷是跟corePoolSize還是 maximumPoolSize判斷。

在確認了執行緒池狀態以及執行緒池中工作執行緒數量之後,才真正開始新增工作執行緒。

新建立一個worker類別(執行緒池的內部類,具體的工作執行緒),將要執行的具體執行緒做為建構方法中的參數傳遞進去,接下來將其加入執行緒池的工作執行緒容器workers,而且更新工作執行緒最大量,最後呼叫worker工作執行緒的start()方法,就完成了工作執行緒的建立與啟動。

讓我們回到execute()方法,如果我們在一開始的執行緒數量就大於corePoolSize,或者我們在呼叫addworker()方法的過程中出現了問題導致新增工作執行緒數量失敗,那麼我們會繼續執行接下來的邏輯。

在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。

Worker类作为线程池的内部类

接下来是Worker()类的成员

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

  • firstTask则是worker在构造方法中所接受到的所要执行的任务。

  • completedTasks作为该worker类所执行完毕的任务总数。

接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了

public void run() {
    runWorker(this);
}

run()方法实现的很简单

我们可以继续追踪下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。

下面是getTask()的实现

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

在runWorker()方法的最后

调用了processWorkerExist()方法来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

以上是Java執行緒池execute()方法怎麼用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
說明JVM如何充當Java代碼和基礎操作系統之間的中介。說明JVM如何充當Java代碼和基礎操作系統之間的中介。Apr 29, 2025 am 12:23 AM

JVM的工作原理是將Java代碼轉換為機器碼並管理資源。 1)類加載:加載.class文件到內存。 2)運行時數據區:管理內存區域。 3)執行引擎:解釋或編譯執行字節碼。 4)本地方法接口:通過JNI與操作系統交互。

解釋Java虛擬機(JVM)在Java平台獨立性中的作用。解釋Java虛擬機(JVM)在Java平台獨立性中的作用。Apr 29, 2025 am 12:21 AM

JVM使Java實現跨平台運行。 1)JVM加載、驗證和執行字節碼。 2)JVM的工作包括類加載、字節碼驗證、解釋執行和內存管理。 3)JVM支持高級功能如動態類加載和反射。

您將採取哪些步驟來確保Java應用程序在不同的操作系統上正確運行?您將採取哪些步驟來確保Java應用程序在不同的操作系統上正確運行?Apr 29, 2025 am 12:11 AM

Java應用可通過以下步驟在不同操作系統上運行:1)使用File或Paths類處理文件路徑;2)通過System.getenv()設置和獲取環境變量;3)利用Maven或Gradle管理依賴並測試。 Java的跨平台能力依賴於JVM的抽象層,但仍需手動處理某些操作系統特定的功能。

Java是否需要特定於平台的配置或調整區域?Java是否需要特定於平台的配置或調整區域?Apr 29, 2025 am 12:11 AM

Java在不同平台上需要進行特定配置和調優。 1)調整JVM參數,如-Xms和-Xmx設置堆大小。 2)選擇合適的垃圾回收策略,如ParallelGC或G1GC。 3)配置Native庫以適應不同平台,這些措施能讓Java應用在各種環境中發揮最佳性能。

哪些工具或庫可以幫助您解決Java開發中特定於平台的挑戰?哪些工具或庫可以幫助您解決Java開發中特定於平台的挑戰?Apr 29, 2025 am 12:01 AM

Osgi,Apachecommonslang,JNA和JvMoptionsareeForhandlingForhandlingPlatform-specificchallengesinjava.1)osgimanagesdeppedendendencenciesandisolatescomponents.2)apachecommonslangprovidesitorityfunctions.3)

JVM如何在不同平台上管理垃圾收集?JVM如何在不同平台上管理垃圾收集?Apr 28, 2025 am 12:23 AM

JVMmanagesgarbagecollectionacrossplatformseffectivelybyusingagenerationalapproachandadaptingtoOSandhardwaredifferences.ItemploysvariouscollectorslikeSerial,Parallel,CMS,andG1,eachsuitedfordifferentscenarios.Performancecanbetunedwithflagslike-XX:NewRa

為什麼Java代碼可以在不同的操作系統上運行,而無需修改?為什麼Java代碼可以在不同的操作系統上運行,而無需修改?Apr 28, 2025 am 12:14 AM

Java代碼可以在不同操作系統上無需修改即可運行,這是因為Java的“一次編寫,到處運行”哲學,由Java虛擬機(JVM)實現。 JVM作為編譯後的Java字節碼與操作系統之間的中介,將字節碼翻譯成特定機器指令,確保程序在任何安裝了JVM的平台上都能獨立運行。

描述編譯和執行Java程序的過程,突出平台獨立性。描述編譯和執行Java程序的過程,突出平台獨立性。Apr 28, 2025 am 12:08 AM

Java程序的編譯和執行通過字節碼和JVM實現平台獨立性。 1)編寫Java源碼並編譯成字節碼。 2)使用JVM在任何平台上執行字節碼,確保代碼的跨平台運行。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

EditPlus 中文破解版

EditPlus 中文破解版

體積小,語法高亮,不支援程式碼提示功能

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境