Heim  >  Artikel  >  Java  >  Analyse des Kerncodes des Java-Thread-Pool-Frameworks

Analyse des Kerncodes des Java-Thread-Pool-Frameworks

伊谢尔伦
伊谢尔伦Original
2016-12-05 11:44:591295Durchsuche

Bei der Multithread-Programmierung ist es unrealistisch, jeder Aufgabe einen Thread zuzuweisen. Der Aufwand für die Thread-Erstellung und der Ressourcenverbrauch sind sehr hoch. Der Thread-Pool ist im Laufe der Zeit entstanden und hat sich für uns zu einem leistungsstarken Tool zur Thread-Verwaltung entwickelt. Java bietet eine Standardmethode zum Entkoppeln des Aufgabenübermittlungsprozesses und des Ausführungsprozesses über die Executor-Schnittstelle und verwendet Runnable zur Darstellung der Aufgabe.

Als nächstes analysieren wir ThreadPoolExecutor, die Implementierung des Java-Thread-Pool-Frameworks.

Die folgende Analyse basiert auf dem Lebenszyklus von JDK1.7

ThreadPoolExecutor, wobei die oberen 3 Bits von CAPACITY verwendet werden, um jeweils den Betriebsstatus darzustellen:

LÄUFT: Neue Aufgaben empfangen und Aufgaben in der Aufgabenwarteschlange verarbeiten
HERUNTERFAHREN: Keine neuen Aufgaben empfangen, sondern Aufgaben in der Aufgabenwarteschlange verarbeiten
STOP: Keine neuen Aufgaben empfangen, nicht herauskommen der Aufgabenwarteschlange und unterbricht alle laufenden Aufgaben. Aufgabe
AUFRÄUMEN: Alle Aufgaben wurden beendet und die Anzahl der Arbeitsthreads beträgt 0. Wenn dieser Status erreicht ist, wird beendet () ausgeführt.
TERMINATED: beendet () wurde ausgeführt

Analyse des Kerncodes des Java-Thread-Pool-Frameworks

Atomklassen werden in ThreadPoolExecutor verwendet, um Statusbits darzustellen

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Thread-Pool-Modell

Kernparameter

corePoolSize: die minimale Anzahl überlebender Arbeitsthreads (wenn AllowCoreThreadTimeOut festgelegt ist, dann ist der Wert 0)
maximumPoolSize: die maximale Anzahl von Threads, begrenzt durch CAPACITY
keepAliveTime: die Überlebenszeit der entsprechenden Thread, die Zeiteinheit wird durch TimeUnit angegeben
workQueue: Arbeitswarteschlange, speichert auszuführende Aufgaben
RejectExecutionHandler: Ablehnungsstrategie, wird ausgelöst, wenn der Thread-Pool voll ist
Maximale Kapazität des Thread-Pools: Die Die ersten drei Ziffern in CAPACITY werden als Flag-Bits verwendet, was bedeutet, dass die maximale Kapazität des Arbeitsthreads (2^29)-1 beträgt

Vier Modelle

CachedThreadPool: Ein zwischenspeicherbarer Thread-Pool. Wenn die aktuelle Größe des Thread-Pools den Verarbeitungsbedarf übersteigt, werden inaktive Threads recycelt. Wenn der Bedarf steigt, können neue Threads hinzugefügt werden, es gibt keine Begrenzung für die Größe des Thread-Pools.
FixedThreadPool: Ein Thread-Pool mit fester Größe. Wenn eine Aufgabe übermittelt wird, wird ein Thread erstellt, bis die maximale Anzahl von Thread-Pools erreicht ist. Zu diesem Zeitpunkt ändert sich die Größe des Thread-Pools nicht mehr.
SingleThreadPool: Ein Thread-Pool mit nur einem Thread. Er kann sicherstellen, dass Aufgaben seriell in der Reihenfolge in der Warteschlange ausgeführt werden. Wenn dieser Thread abnormal endet, wird ein neuer Thread erstellt die Aufgabe ausführen.
ScheduledThreadPool: Ein Thread-Pool fester Größe, der Aufgaben verzögert oder geplant ausführt, ähnlich wie Timer.
Aufgabe ausführen

Kernlogik:

Aktuelle Anzahl von Threads Aktuelle Anzahl of threads> = corePoolSize, und die Aufgabe wurde erfolgreich zur Arbeitswarteschlange hinzugefügt
Überprüfen Sie, ob der aktuelle Status des Thread-Pools RUNNING ist
Wenn nicht, lehnen Sie die Aufgabe ab
Wenn ja, ermitteln Sie, ob die aktuelle Anzahl Anzahl der Threads ist 0. Wenn sie 0 ist, erhöhen Sie sie. Ein Arbeitsthread.
Ermöglichen Sie einem normalen Thread die Ausführung der Aufgabe addWorker(command, false) und lehnen Sie die Aufgabe ab, wenn sie nicht gestartet werden kann.
Aus der obigen Analyse können wir die vier Phasen des Thread-Pool-Betriebs zusammenfassen:

poolSize poolSize == corePoolSize Der Arbeitsthread erhält die Aufgabenausführung aus der Warteschlange. Zu diesem Zeitpunkt ist die Warteschlange weder leer noch voll.
poolSize == corePoolSize, und die Warteschlange ist voll, ein neuer Thread wird erstellt, um die übermittelten Aufgaben zu verarbeiten, aber poolSize poolSize == maxPoolSize, und die Warteschlange ist voll, die Ablehnungsrichtlinie gilt getriggert
Ablehnungsstrategie

Wir haben bereits erwähnt, dass Aufgaben, die nicht ausgeführt werden können, abgelehnt werden. RejectedExecutionHandler ist die Schnittstelle zur Bearbeitung abgelehnter Aufgaben. Hier sind vier Ablehnungsstrategien.

AbortPolicy: Standardrichtlinie, beendet die Aufgabe, wirft RejectedException
CallerRunsPolicy: Führt die aktuelle Aufgabe im Aufrufer-Thread aus, es wird keine Ausnahme geworfen
DiscardPolicy: Die Richtlinie aufgeben, die Aufgabe direkt verwerfen, nein Ausnahme wird ausgelöst
DiscardOldersPolicy: Verwerfen Sie die älteste Aufgabe und führen Sie die aktuelle Aufgabe aus, ohne eine Ausnahme auszulösen

Worker im Thread-Pool

Worker erbt AbstractQueuedSynchronizer und Runnable. Ersteres stellt die Sperrfunktion bereit an den Worker und letztere Die Hauptmethode zum Ausführen des Worker-Threads ist runWorker(Worker w) (Abrufen von Aufgaben aus der Aufgabenwarteschlange zur Ausführung). Die Worker-Referenz wird in der Workers-Sammlung gespeichert und durch mainLock geschützt.

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();

Kernfunktion runWorker

Das Folgende ist die vereinfachte Logik. Hinweis: Die Ausführung jedes Arbeitsthreads führt die folgende Funktion aus

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);        
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}

从getTask()中获取任务 
锁住 worker 
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法 
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。 
执行afterExecute(task, thrown); 
解锁 worker 
如果获取到的任务为 null,关闭 worker 
获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

现有的线程数量超过最大线程数量 
线程池处于STOP状态 
线程池处于SHUTDOWN状态且工作队列为空 
线程等待任务超时,且线程数量超过保留线程数量 
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

在以下两种情况下等待任务会超时:

允许核心线程等待超时,即allowCoreThreadTimeOut(true) 
当前线程是普通线程,此时wc > corePoolSize 
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

总结

ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。 
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。 
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。 
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn