ホームページ >Java >&#&チュートリアル >Javaスレッドプールの拒否ポリシーとは何ですか
プール設計のアイデア
プール設計は新しい用語ではありません。 Java スレッド プール、jdbc 接続プール、redis 接続プールなどの一般的なものは、このタイプの設計の代表的な実装です。
この設計では、最初にリソースが事前設定されます。解決される問題は、スレッド作成のコストやリモート接続の取得のコストなど、各リソース取得の消費を相殺することです。食堂にご飯を取りに行くのと同じように、ご飯を作ってくれたおばちゃんが先にご飯を何人分か置いてくれるので、来たらお弁当を持って野菜を入れるだけでOKです。ご飯と野菜の準備を同時に行うので効率的です。
プーリング設計には、リソースの初期化に加えて、プールの初期値、プールのアクティブな値、プールの最大値などの機能も含まれています。これらの機能は、次の機能に直接マッピングできます。プロパティ内の Java スレッド プールとデータベース接続プールのメンバー。
スレッド プールが拒否ポリシーをトリガーするタイミング
データ ソース接続プールとは異なり、初期サイズとプールの最大値に加えて、スレッドプールには追加のブロッキング キューもあります。
データ ソース接続プールは通常、要求された接続の数が接続プールの最大値を超えると拒否ポリシーをトリガーします。このポリシーは通常、ブロックして設定された時間待機するか、例外を直接スローします。
図に示すように、スレッド プールがいつ拒否をトリガーするかを知りたい場合は、上記の 3 つのパラメータの具体的な意味を明確にする必要があります。これら 3 つのパラメータの全体的な調整の結果であり、単に最大スレッド数を超えるとスレッド拒否がトリガーされるわけではありません。送信されたタスクの数が corePoolSize よりも大きい場合、最初にキュー バッファーに配置されます。バッファがいっぱいになった場合、現在実行中のタスクが maxPoolSize より大きいかどうかを判断し、値より小さい場合は新しいスレッドを作成して処理します。この値より大きい場合、拒否ポリシーがトリガーされます。
要約: 現在送信されたタスクの数が (maxPoolSize queueCapacity) より大きい場合、スレッド プールの拒否ポリシーがトリガーされます。
JDK には 4 つのスレッド プール拒否戦略が組み込まれています
拒否戦略インターフェイス定義
スレッド プールの分析では、 JDK に付属しています ポリシーを拒否する前に、次のように、JDK によって定義された拒否ポリシー インターフェイスを確認してください:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
インターフェイスは非常に明確に定義されています。拒否ポリシーがトリガーされると、スレッド プールは現在送信されているタスクを転送するために設定した特定のポリシー。また、スレッド プール インスタンス自体が処理のために渡されます。シナリオが異なれば、その処理方法について考慮すべき点も異なります。JDK がどのような実装を組み込んでいるかを見てみましょう:
CallerRunsPolicy (呼び出し元実行ポリシー)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
関数: 拒否ポリシーがトリガーされると、スレッド プールが閉じられていない限り、拒否ポリシーがトリガーされます。タスクを送信した現在のスレッドによって処理されます。
使用シナリオ: 通常、スレッド プールは通常の状況では閉じられないため、失敗が許されず、パフォーマンス要件が高くなく、同時実行性が小さいシナリオで使用されます。 , サブミット タスクは必ず実行されますが、呼び出し元のスレッド自体が実行するため、タスクが複数回サブミットされると、後続のタスクの実行がブロックされ、当然パフォーマンスと効率が低下します。
AbortPolicy (中止ポリシー)
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
関数: 拒否ポリシーがトリガーされると、実行を拒否する例外が直接スローされます。ポリシーを中止することを意味します。つまり、現在の実行プロセスを中断します。
使用シナリオ:これには特別なシナリオはありませんが、スローされた例外を正しく処理することが 1 つあります。
ThreadPoolExecutor のデフォルト ポリシーは AbortPolicy です。ExecutorService インターフェイスの一連の ThreadPoolExecutor には明示的な拒否ポリシーがないため、デフォルト ポリシーはこれです。
ただし、ExecutorService のスレッド プール インスタンス キューは無制限であることに注意してください。つまり、メモリが使い果たされても拒否ポリシーはトリガーされません。スレッド プール インスタンスをカスタマイズする場合、このストラテジの使用時にストラテジがトリガーされたときにスローされた例外を処理する必要があります。これは、現在の実行プロセスが中断されるためです。
DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
関数: アクションをトリガーせずにこのタスクを静かに破棄します
使用シナリオ: 提出したタスクが重要でない場合に使用できます。これは空の実装であり、タスクを黙って飲み込んでしまうためです。したがって、このポリシーは基本的に必要なくなりました。
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
Function:スレッド プールが閉じられていない場合は、キュー ヘッド要素をポップアップします。
使用シナリオ: この戦略でもタスクは破棄され、破棄時にサイレントになりますが、古い未実行のタスクを破棄するという特徴があります。実行する優先度が高いタスクです。
この機能に基づいて、私が考えることができるシナリオは、メッセージを公開してメッセージを変更することです。メッセージは公開された後、実行されていません。このとき、更新されたメッセージが再び来ます。今回は、未実行のメッセージです。バージョンが現在送信されているメッセージのバージョンよりも低い場合は、破棄できます。実行のためにキューに入れられるキューには、メッセージ バージョンが低いメッセージが存在する可能性があるため、実際にメッセージを処理するときにメッセージ バージョンを比較する必要があります。
サードパーティによる拒否戦略の実施
dubbo中的线程拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } private void dumpJStack() { //省略实现 } }
可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。
1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在
2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。
3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性
Netty中的线程池拒绝策略
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { NewThreadRunsPolicy() { super(); } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { final Thread t = new Thread(r, "Temporary task executor"); t.start(); } catch (Throwable e) { throw new RejectedExecutionException( "Failed to start a new thread", e); } } }
Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。
所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。
activeMq中的线程池拒绝策略
new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { executor.getQueue().offer(r, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); } throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); } });
activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常
pinpoint中的线程池拒绝策略
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { private final RejectedExecutionHandler[] handlerChain; public static RejectedExecutionHandler build(List<rejectedexecutionhandler> chain) { Objects.requireNonNull(chain, "handlerChain must not be null"); RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); return new RejectedExecutionHandlerChain(handlerChain); } private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); } } }</rejectedexecutionhandler>
pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。
以上がJavaスレッドプールの拒否ポリシーとは何ですかの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。