ホームページ  >  記事  >  Java  >  Java スレッド プール フレームワークのコア コード分析

Java スレッド プール フレームワークのコア コード分析

伊谢尔伦
伊谢尔伦オリジナル
2016-12-05 11:44:591253ブラウズ

マルチスレッド プログラミングでは、各タスクにスレッドを割り当てることは非現実的であり、スレッド作成のオーバーヘッドとリソース消費が非常に高くなります。スレッド プールは時代の要求に応じて登場し、スレッドを管理するための強力なツールになりました。 Java は、Executor インターフェイスを通じてタスク送信プロセスと実行プロセスを分離する標準メソッドを提供し、Runnable を使用してタスクを表します。

それでは、Java スレッド プール フレームワークの実装である ThreadPoolExecutor を分析してみましょう。

次の分析は JDK1.7 に基づいています

ライフサイクル

ThreadPoolExecutor では、CAPACITY の上位 3 ビットが実行ステータスを表すために使用されます。

RUNNING: 新しいタスクを受信し、タスク内のタスクを処理します。キュー
SHUTDOWN: 新しいタスクを受信しませんが、タスクキュー内のタスクを処理します
STOP: 新しいタスクを受信せず、タスクキューから出ず、進行中のすべてのタスクを中断します
TIDYING: すべてのタスクが終了しました。ワーカースレッドの数が 0 の場合、この状態に達すると、terminated() が実行されます
TERMINATED:terminated() の実行が完了しました

Java スレッド プール フレームワークのコア コード分析

ThreadPoolExecutor は、アトミッククラスを使用してステータスビットを表します

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

スレッドプールモデル

コアパラメータ

corePoolSize : 存続するワーカー スレッドの最小数 (allowCoreThreadTimeOut が設定されている場合、値は 0)
maximumPoolSize: スレッドの最大数、CAPACITY によって制限される
keepAliveTime: 対応するスレッドの生存時間、時間単位は TimeUnit で指定される
workQueue: 実行するタスクを保存する作業キュー
RejectExecutionHandler: 拒否ポリシー、スレッド プールがいっぱいになるとトリガーされます
スレッド プールの最大容量: CAPACITY の最初の 3 桁はフラグ ビットとして使用されます。ワーカー スレッドの最大容量は (2^29)-1 です

4 つのモデル

CachedThreadPool: 1 つのキャッシュ可能なスレッド プール。スレッド プールの現在のサイズが処理需要を超える場合、需要が増加するとアイドル状態のスレッドがリサイクルされます。 、新しいスレッドを追加できます。スレッド プールのサイズに制限はありません。
FixedThreadPool: 固定サイズのスレッド プール。タスクが送信されると、スレッド プールの最大数に達するまでスレッドが作成され、最大数に達するとスレッド プールのサイズは変更されなくなります。
SingleThreadPool: シングルスレッドのスレッド プール。タスクを実行するためのワーカー スレッドが 1 つだけあり、このスレッドが異常終了した場合、タスクを実行するための新しいスレッドが作成されます。タスク。
ScheduledThreadPool: タイマーと同様に、タスクを遅延またはスケジュールされた方法で実行する固定サイズのスレッド プール。
タスクの実行execute

コアロジック:

現在のスレッド数 現在のスレッド数 >= corePoolSize、およびタスクがワークキューに正常に追加されました
プールの現在のステータスが RUNNING かどうかを確認します
そうでない場合はタスクを拒否します
YES の場合は、現在のスレッド数が 0 かどうかを確認し、0 の場合はワーカースレッドを追加します。
通常のスレッド実行タスク addWorker(command, false) を有効にし、開始に失敗した場合はタスクを拒否します。
上記の分析から、スレッド プールの操作の 4 つの段階を要約できます。が空の場合、この時点で新しいスレッドが作成され、送信されたタスク

poolSize == corePoolSize が作成されます。この時点で、送信されたタスクはワーク キューに入り、ワーカー スレッドはキューからタスクの実行を取得します。現時点では、キューは空でもいっぱいでもありません。

poolSize == corePoolSize、キューがいっぱいの場合、送信されたタスクを処理するために新しいスレッドが作成されますが、poolSize poolSize == maxPoolSize、キューがいっぱいの場合、この時点で拒否ポリシーがトリガーされます
拒否ポリシー

前に、実行できないタスクは拒否されると述べました。 RejectedExecutionHandler は、拒否されたタスクを処理するためのインターフェイスです。ここでは4つの拒否戦略を紹介します。

AbortPolicy: デフォルトのポリシー、タスクを終了し、RejectedException をスローします

CallerRunsPolicy: 呼び出し側スレッドで現在のタスクを実行します、例外はスローされません

DiscardPolicy: ポリシーを破棄します、タスクを直接破棄します、例外はスローされません
DiscardOldersPolicy: 最も古いものを破棄しますtask、execute 現在のタスクは例外をスローしません

スレッドプール内のWorker

WorkerはAbstractQueuedSynchronizerとRunnableを継承し、前者はWorkerにロック関数を提供し、後者はワーカースレッドのmainメソッドを実行しますrunWorker(Worker w ) (タスクキューからタスクの実行を取得します)。 Worker 参照はワーカー コレクションに保存され、mainLock によって保護されます。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();

コア関数 runWorker

以下は、簡略化されたロジックです。 注: 各ワーカー スレッドの実行により、次の関数が実行されます

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);        
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}

从getTask()中获取任务 
锁住 worker 
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法 
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。 
执行afterExecute(task, thrown); 
解锁 worker 
如果获取到的任务为 null,关闭 worker 
获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

现有的线程数量超过最大线程数量 
线程池处于STOP状态 
线程池处于SHUTDOWN状态且工作队列为空 
线程等待任务超时,且线程数量超过保留线程数量 
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

在以下两种情况下等待任务会超时:

允许核心线程等待超时,即allowCoreThreadTimeOut(true) 
当前线程是普通线程,此时wc > corePoolSize 
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

总结

ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。 
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。 
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。 
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。