ホームページ >Java >&#&チュートリアル >実稼働ブロッキングをサポートする Java スレッド プール
一般的に、生産タスクの速度は消費の速度よりも高速です。詳細は、キューの長さと、生産と消費の速度をどのように一致させるかです。
典型的なプロデューサー/コンシューマー モデルは次のとおりです:
J.U.C が提供する Queue 実装を並行環境で使用すると、生産時と消費時のスレッドの安全性を簡単に確保できます。ここで注意する必要があるのは、プロデューサーがあまりにも早く生成してキューの長さが急増し、最終的に OutOfMemory がトリガーされるのを防ぐために、キューで初期容量を設定する必要があるということです。
生産が消費よりも速い一般的な状況向け。キューがいっぱいの場合、タスクが無視されたり実行されたりすることは望ましくありません。このとき、プロデューサーはタスクを送信する前にしばらく待機することができます。より良い方法は、タスクを送信するメソッドでプロデューサーをブロックすることです。を選択し、キューがいっぱいになっていないときにタスクの送信を続行するため、無駄なアイドル時間が発生しません。ブロック化も非常に簡単です。ArrayBlockingQueue と LinkedBlockingQueue は両方とも、キューが実際に操作されるときに、各ロックが取得された後の容量を決定します。
さらに、キューが空の場合、コンシューマはタスクを取得できず、再度取得するためにしばらく待つことができます。タスクが存在する場合は、BlockingQueue の take メソッドを使用してブロックして待機します。すぐに実行できます。タイムアウト パラメーターを指定して take のオーバーロードされたメソッドを呼び出すことをお勧めします。タイムアウト後にスレッドが終了します。このようにして、生産者が実際に生産を停止しても、消費者は無期限に待たされることはありません。
そのため、ブロッキングをサポートする効率的な生産および消費モデルが実装されます。
ちょっと待ってください。J.U.C はすでにスレッド プールの実装を支援してくれているのに、なぜまだこのセットを使用する必要があるのでしょうか? ExecutorService を直接使用したほうが便利ではないでしょうか?
ThreadPoolExecutor の基本構造を見てみましょう:
ThreadPoolExecutor では、BlockingQueue 部分と Consumer 部分が実装されており、スレッド プールを直接使用することには次のような多くの利点があることがわかります。スレッド数の動的調整など。
しかし、問題は、ThreadPoolExecutor を構築するときにキュー実装として BlockingQueue を手動で指定したとしても、実際にはキューがいっぱいの場合、その理由は ThreadPoolExecutor が非ブロッキング メソッドを呼び出すためです。 BlockingQueue の offer メソッド:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
この時点で、結果を達成するために何かを行う必要があります。プロデューサーがタスクを送信し、キューがいっぱいになった場合、プロデューサーはブロックされ、タスクが消費されるのを待つことができます。
重要なのは、同時実行環境では、プロデューサーはキューがいっぱいかどうかを判断できず、キューがいっぱいかどうかを判断するために ThreadPoolExecutor.getQueue().size() を呼び出すことができないということです。
スレッドプールの実装では、キューがいっぱいになると、構築中に渡されたRejectedExecutionHandlerが呼び出され、タスクの処理が拒否されます。デフォルトの実装は AbortPolicy で、RejectedExecutionException を直接スローします。
ここではいくつかの拒否戦略については説明しません。私たちのニーズに近いのは CallerRunsPolicy です。この戦略では、キューがいっぱいになったときにタスクを送信したスレッドがタスクを実行できるようになります。これは、プロデューサーに一時的に許可することと同じです。コンシューマによって行われた作業は削除されるため、プロデューサはブロックされませんが、送信されたタスクも一時停止されます。
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
しかし、この戦略には隠れた危険もあります。プロデューサーがタスクを消費している間に、コンシューマーがすべてのタスクを消費し終えて、キューが空の状態になった後のみプロデューサーが続行できる可能性があります。実稼働タスクを完了すると、このプロセスでコンシューマ スレッドが枯渇する可能性があります。
同様のアイデアを参照してください。最も簡単な方法は、RejectedExecutionHandler を直接定義し、キューがいっぱいになったときに BlockingQueue.put を呼び出してプロデューサー ブロッキングを実装することです:
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
この方法では、キューとコンシューマ ロジックでは、プロデューサ スレッドとコンシューマ スレッドの実装ロジックに重点を置き、タスクをスレッド プールに送信するだけです。
元の設計と比較して、このメソッドのコード量は大幅に削減でき、同時環境での多くの問題を回避できます。もちろん、送信時にセマフォを使用してエントリを制限するなど、他の方法を使用することもできますが、単にプロデューサーにブロックさせたい場合は複雑になります。
実稼働ブロッキングをサポートする Java スレッド プールに関連するその他の記事については、PHP 中国語 Web サイトに注目してください。