Java Thread Pool
最近、プロジェクトの同時実行機能を改善してきましたが、開発は凸凹していました。たくさんの情報を読んで、ようやく理解が深まりました。そこで、一緒にソースコードを確認して、同時プログラミングの原理をまとめてみようと思いました。
最もよく使用されるスレッド プールから開始し、作成、実行、シャットダウンを中心としたスレッド プールのライフ サイクル全体の実装原則を理解する準備をしてください。後で、アトミック変数、同時コンテナ、ブロックキュー、同期ツール、ロックなどのトピックについて学習します。 java.util.concurrent の同時実行ツールは使用するのが難しくありませんが、ただ使用するだけではだめです。ソース コードを読まなければなりません (笑)。ちなみに使用しているJDKは1.8です。
Executor フレームワーク
Executor は、インターフェイス内に Runnable タスクを実行するメソッドが 1 つだけあります。 ExecutorService インターフェイスは Executor を拡張し、スレッドのライフサイクル管理を追加し、タスクの終了やタスクの結果の返しなどのメソッドを提供します。 AbstractExecutorService は ExecutorService を実装し、submit メソッドなどのデフォルトの実装ロジックを提供します。
それでは、今日のトピックである ThreadPoolExecutor は、AbstractExecutorService を継承し、スレッド プールの特定の実装を提供します。
構築メソッド
以下は、最大 7 つのパラメータを持つ ThreadPoolExecutor の最も一般的なコンストラクターです。特定のコードは投稿しません。パラメーターの確認と設定に関するいくつかのステートメントのみを掲載します。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize は、スレッド プールのターゲット サイズです。これは、スレッド プールが作成されたばかりで、実行するタスクがないときのサイズです。 MaximumPoolSize は、スレッド プールの最大上限です。 keepAliveTime はスレッドの生存時間です。スレッド プール内のスレッドの数が corePoolSize よりも大きい場合、生存時間を超えたアイドル状態のスレッドがリサイクルされます。ユニットは言うまでもなく、残りの 3 つのパラメータは後で分析されます。
デフォルトのカスタマイズされたスレッド プール
ThreadPoolExecutor は、Executor のファクトリ メソッドによって作成されたいくつかのカスタマイズされたスレッド プールをプリセットします。 newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool の作成パラメーターを分析してみましょう。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool の corePoolSize と minimumPoolSize は両方とも受信固定数に設定され、keepAliveTim は 0 に設定されます。スレッド プールの作成後、スレッドの数は固定されるため、スレッドの安定性が必要な状況に適しています。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor は、スレッド数が 1 に固定されている newFixedThreadPool のバージョンで、プール内のタスクのシリアル化を保証します。 FinalizableDelegatedExecutorService が返されることに注意してください:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService は DelegatedExecutorService を継承し、gc 中にスレッド プールを閉じる操作のみを追加します。 DelegatedExecutorService のソース コードを見てみましょう:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
DelegatedExecutorService は ExecutorService をラップし、ExecutorService メソッドのみを公開するため、スレッド プールのパラメーターを構成できなくなります。本来、スレッドプールが作成するパラメータは調整可能であり、ThreadPoolExecutor が set メソッドを提供します。 newSingleThreadExecutor を使用する目的は、シングルスレッドのシリアル スレッド プールを生成することです。スレッド プールのサイズも構成できたら退屈でしょう。
Executors は、通常のスレッド プールを構成不可能なスレッド プールにラップする unconfigurableExecutorService メソッドも提供します。将来の未知の世代によってスレッド プールが変更されることを望まない場合は、このメソッドを呼び出すことができます。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool は、キャッシュされたスレッド プールを生成します。スレッド数の範囲は 0 から Integer.MAX_VALUE で、タイムアウトは 1 分です。スレッド プールを使用すると、アイドル状態のスレッドがあればそのスレッドが再利用され、アイドル状態のスレッドが 1 分以上続くと新しいスレッドが作成されます。リサイクルされた。
newScheduledThreadPool
newScheduledThreadPoolは、タスクを定期的に実行できるスレッドプールを作成します。これについてはこの記事では説明する予定はありません。後で別の記事で詳しく説明します。
待機キュー
newCachedThreadPool のスレッド制限はほぼ無制限ですが、システム リソースには限りがあり、タスクの処理速度はタスクの送信速度ほど速くない場合があります。したがって、スレッド不足により待機している実行可能タスクを保存するために、ThreadPoolExecutor にブロッキング キューを提供できます。これが BlockingQueue です。
JDK は、BlockingQueue の実装メソッドをいくつか提供しています。 一般的に使用されるものは次のとおりです。
ArrayBlockingQueue: 配列構造のブロッキング キュー
LinkedBlockingQueue: リンク リスト構造のブロッキング キュー
PriorityBlockingQueue: 優先ブロッキング キュー
SynchronousQueue: なし店要素
newFixedThreadPool および newSingleThreadExecutor は、デフォルトで無制限の LinkedBlockingQueue を使用します。タスクが常に送信されても、スレッド プールが時間内にタスクを処理できない場合、待機キューは無限に長くなり、システム リソースが使い果たされる瞬間が必ず発生することに注意してください。したがって、リソースの枯渇を避けるために、制限された待機キューを使用することをお勧めします。しかし、1 つの問題を解決すると、新しい問題が発生します。キューがいっぱいになり、新しいタスクが来た後、この時点で何をすべきでしょうか?キューの飽和に対処する方法は後で紹介します。
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
饱和策略
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
ThreadFactory
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
更多Java 线程池详解及创建简单实例相关文章请关注PHP中文网!