Heim >Java >javaLernprogramm >So verwenden Sie die Java-Thread-Pool-Methodeexecute()
* Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks.
Der Thread-Pool behandelt zwei verschiedene Probleme, indem er den Overhead reduziert, bevor der Thread offiziell aufgerufen wird Bietet eine Reihe von Möglichkeiten zum Binden von Verwaltungsaufgaben-Threads. Jeder Thread-Pool enthält einige grundlegende Informationen, beispielsweise die Anzahl der intern erledigten Aufgaben.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl der ThreadPoolExecutor-Klasse an, die den Status darstellen. Als AtomicInteger-Klasse speichert sie zwei Arten von Informationen in der Klasse. Unter anderem speichern die oberen 3 Bits den Status des Thread-Pools , und die letzten 29 Bits speichern zu diesem Zeitpunkt die Anzahl der Woker-Klassen-Threads im Thread-Pool (es ist ersichtlich, dass die maximal akzeptable Anzahl von Threads im Thread-Pool etwa 500 Millionen beträgt). Es ist ersichtlich, dass die bereitgestellten Methoden runStateOf() und workerCountOf() jeweils Methoden zum Anzeigen des Thread-Status und der Anzahl der Threads bereitstellen.
Schauen wir uns die Kommentare des Autors an
* RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed
Der Zustand RUNNING
kann neue eingehende Aufgaben annehmen und auch Aufgaben in der Warteschlange ausführen. Der Status RUNNING
状态可以接受新进来的任务,同时也会执行队列里的任务。
SHUTDOWN
状态已经不会再接受新任务,但仍旧会处理队列中的任务。
STOP
状态在之前的基础上,不会处理队列中的人物,在执行的任务也会直接被打断。
TIDYING
状态在之前的基础上,所有任务都已经终止,池中的Worker线程都已经为0,也就是stop状态在清理完所有工作线程之后就会进入该状态,同时在shutdown状态在队列空以及工作线程清理完毕之后也会直接进入这个阶段,这一阶段会循环执行terminated()方法。
TERMINATED
状态作为最后的状态,在之前的基础上terminated()方法也业已执行完毕,才会从上个状态进入这个状态,代表线程池已经完全停止。
由于线程池的状态都是通过AtomicInteger来保存的,可以通过比较的方式简单的得到当前线程状态。
private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize;
corePoolSize
表示线程池中允许存活最少的工作线程数量,但值得注意的是如果allowCoreThreadTimeOut一旦设置true(默认false),每个线程的存活时间只有keepAliveTime也就是说在allowCoreThreadTimeOut为true的时候,该线程池最小的工作线程数量为0;maximumPoolSize代表线程池中最大的工作线程数量。
keepAliveTime
为线程池中工作线程数量大于corePoolSize时,每个工作线程的在等待工作时最长的等待时间。
workQueue
作为线程池的任务等待队列,这个将在接下来的execute()里详细解释。
Workers
作为存放线程池中存放工作线程的容器。
largestPoolSize
用来记录线程池中存在过的最大的工作线程数量。
completedTaskCount
用来记录线程池完成的任务的总数。
Handler
SHUTDOWN
nimmt keine neuen Aufgaben mehr an, Aufgaben in der Warteschlange werden jedoch weiterhin verarbeitet.
STOP
-Status verarbeitet die Zeichen in der Warteschlange nicht und die ausgeführten Aufgaben werden direkt unterbrochen.
TIDYING
Der Status basiert auf dem vorherigen. Alle Aufgaben wurden beendet und die Worker-Threads im Pool waren alle 0. Das heißt, der Stoppstatus wechselt danach in diesen Status Bereinigen aller Worker-Threads, und gleichzeitig wird im Shutdown-Status direkt in diese Phase eingetreten, nachdem die Warteschlange leer ist und der Worker-Thread bereinigt wurde zyklisch ausgeführt.
TERMINATED
Der Status ist der letzte Status. Basierend auf dem vorherigen wurde auch die Methode „terminated()“ ausgeführt und wechselt dann vom vorherigen Status in diesen Status dass der Thread-Pool vollständig gestoppt wurde.
Da der Status des Thread-Pools über AtomicInteger gespeichert wird, kann der aktuelle Thread-Status einfach durch Vergleich ermittelt werden. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }Als nächstes folgen mehrere Variablen, die sich auf Arbeitsthreads im Thread-Pool beziehen Wird einmal auf „true“ gesetzt (Standardwert „false“), beträgt die Überlebenszeit jedes Threads nur „keepAliveTime“. Das heißt, wenn „allowCoreThreadTimeOut“ „true“ ist, beträgt die minimale Anzahl der Arbeitsthreads im Thread-Pool 0; Threads im Thread-Pool.
keepAliveTime
ist die längste Wartezeit für jeden Arbeitsthread beim Warten auf Arbeit, wenn die Anzahl der Arbeitsthreads im Thread-Pool größer als corePoolSize ist. 🎜🎜🎜🎜workQueue
dient als Aufgabenwarteschlange des Thread-Pools. Dies wird im Folgenden im Detail erläutert. 🎜🎜🎜🎜Workers
dient als Container zum Speichern von Worker-Threads im Thread-Pool. 🎜🎜🎜🎜largestPoolSize
wird verwendet, um die größte Anzahl von Arbeitsthreads aufzuzeichnen, die jemals im Thread-Pool vorhanden waren. 🎜🎜🎜🎜completedTaskCount
wird verwendet, um die Gesamtzahl der vom Thread-Pool abgeschlossenen Aufgaben aufzuzeichnen. 🎜🎜🎜🎜Handler
ist eine Ablehnungsstrategie, wenn der Thread-Pool keine Aufgaben annehmen kann. Wir können unsere eigene Ablehnungsstrategie unter der Voraussetzung implementieren, dass die RejectedExecutionHandler-Schnittstelle implementiert wird. Das Folgende ist die Standard-Ablehnungsrichtlinie des Thread-Pools: 🎜🎜🎜public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }🎜threadFactory wird als Factory-Klasse für Thread-Pool-Produktionsthreads verwendet🎜🎜Das Folgende ist die Thread-Produktionsmethode der Standard-Thread-Factory des Thread-Pools🎜
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }🎜Wir können zuerst einen Blick darauf werfen Bei unserer am häufigsten aufgerufenen Methode „execute()“ ist die interne Aufruflogik von „execute()“ sehr klar. 🎜🎜Wenn die Anzahl der Arbeitsthreads im aktuellen Thread-Pool kleiner als corePoolSize ist, rufen Sie addWoker() direkt auf, um Arbeitsthreads hinzuzufügen. 🎜🎜Das Folgende ist die spezifische Methode von addWorker()🎜
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }🎜Diese Methode ist relativ lang, aber die Gesamtlogik ist immer noch klar. 🎜🎜Bestimmen Sie zunächst den Status des aktuellen Thread-Pools. Wenn der Status nicht heruntergefahren oder ausgeführt wird oder er heruntergefahren ist, aber die Arbeitswarteschlange leer ist, wird zu diesem Zeitpunkt direkt auf den Fehler beim Hinzufügen von Arbeit zurückgegriffen. Der nächste Schritt besteht darin, die Anzahl der Threads im Thread-Pool anhand des Kernwerts zum Zeitpunkt des Aufrufs zu beurteilen und zu beurteilen, ob er auf corePoolSize oder maximumPoolSize basiert. 🎜🎜Nachdem Sie den Thread-Pool-Status und die Anzahl der Worker-Threads im Thread-Pool bestätigt haben, können Sie tatsächlich mit dem Hinzufügen von Worker-Threads beginnen. 🎜🎜Erstellen Sie eine neue Worker-Klasse (eine interne Klasse des Thread-Pools, einen bestimmten Arbeitsthread), übergeben Sie den auszuführenden spezifischen Thread als Parameter in der Konstruktionsmethode und fügen Sie ihn dann den Worker-Thread-Container-Workern des Threads hinzu Poolen und aktualisieren Sie die maximale Anzahl von Arbeitsthreads und schließlich den Aufruf der start()-Methode des Arbeitsthreads, um die Erstellung und den Start des Arbeitsthreads abzuschließen. 🎜🎜Kehren wir zur Methode „execute()“ zurück, wenn die Anzahl der Threads, die wir zu Beginn haben, größer ist als corePoolSize, oder wenn beim Aufrufen der Methode „addworker()“ ein Problem auftritt und die Anzahl der Arbeitsthreads nicht hinzugefügt werden kann , dann werden wir weiterhin die Next-Down-Logik ausführen. 🎜
在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。
但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。
如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。
如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。
在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。
接下来是Worker()类的成员
final Thread thread; Runnable firstTask; volatile long completedTasks;
thread
作为worker的工作线程空间,由线程池中所设置的线程工厂生成。
firstTask
则是worker在构造方法中所接受到的所要执行的任务。
completedTasks
作为该worker类所执行完毕的任务总数。
接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了
public void run() { runWorker(this); }
我们可以继续追踪下去
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。
但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。
接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。
在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。
private Runnable getTask() { boolean timedOut = false; retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。
调用了processWorkerExist()方法来执行工作线程的回收。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。
在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。
Das obige ist der detaillierte Inhalt vonSo verwenden Sie die Java-Thread-Pool-Methodeexecute(). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!