首頁 >Java >java教程 >Java執行緒池的拒絕策略是什麼

Java執行緒池的拒絕策略是什麼

WBOY
WBOY轉載
2023-05-14 08:13:05892瀏覽

池化設計想法

池話設計應該不是一個新名詞。我們常見的如java執行緒池、jdbc連線池、redis連線池等就是這類設計的代表實作。

這種設計會初始預設資源,解決的問題就是抵銷每次取得資源的消耗,如建立執行緒的開銷,取得遠端連線的開銷等。就好比你去食堂打飯,打飯的大媽會先把飯盛好幾份放那裡,你來了就直接拿著飯盒加菜即可,不用再臨時又盛飯又打菜,效率就高了。

除了初始化資源,池化設計還包含以下這些特徵:池子的初始值、池子的活躍值、池子的最大值等,這些特徵可以直接對應到java執行緒池和資料庫連接池的成員屬性中。

執行緒池觸發拒絕策略的時機

和資料來源連線池不一樣,執行緒池除了初始大小和池子最大值,還多了一個阻塞佇列來緩衝。

資料來源連線池一般請求的連線數超過連線池的最大值的時候就會觸發拒絕策略,策略一般是阻塞等待設定的時間或直接拋異常。

Java執行緒池的拒絕策略是什麼

如圖,想要了解線程池什麼時候觸發拒絕粗略,需要明確上面三個參數的具體含義,是這三個參數總體協調的結果,而不是簡單的超過最大執行緒數就會觸發執行緒拒絕粗略,當提交的任務數大於corePoolSize時,會優先放到佇列緩衝區,只有填滿了緩衝區後,才會判斷目前執行的任務是否大於maxPoolSize,小於時會新執行緒處理。大於時就觸發了拒絕策略。

總結就是:目前提交任務數大於(maxPoolSize queueCapacity)時就會觸發執行緒池的拒絕策略了。

JDK內建4種執行緒池拒絕策略

拒絕策略介面定義

在分析JDK自帶的執行緒池拒絕策略前,先看下JDK定義的拒絕策略接口,如下:

public interface RejectedExecutionHandler {      void rejectedExecution(Runnable r, ThreadPoolExecutor executor);  }

接口定義很明確,當觸發拒絕策略時,線程池會調用你設置的具體的策略,將當前提交的任務以及執行緒池實例本身傳遞給你處理,具體作何處理,不同場景會有不同的考慮,下面看JDK為我們內建了哪些實作:

CallerRunsPolicy(呼叫者運行策略)

public static class CallerRunsPolicy implements RejectedExecutionHandler {          public CallerRunsPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  r.run();              }          }      }

功能:當觸發拒絕策略時,只要執行緒池沒有關閉,就由提交任務的目前執行緒處理。

使用場景:一般在不允許失敗的、對效能要求不高、並發量較小的場景下使用,因為執行緒池一般情況下不會關閉,也就是提交的任務一定會被運行,但是由於是呼叫者執行緒自己執行的,當多次提交任務時,就會阻塞後續任務執行,效能和效率自然就慢了。

AbortPolicy(中止策略)

public static class AbortPolicy implements RejectedExecutionHandler {          public AbortPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              throw new RejectedExecutionException("Task " + r.toString() +                                                   " rejected from " +                                                   e.toString());          }      }

功能:當觸發拒絕策略時,直接拋出拒絕執行的例外狀況,中止策略的意思也就是打斷目前執行流程

使用場景:這個就沒有特殊的場景了,但是一點要正確處理拋出的例外。

ThreadPoolExecutor中預設的策略就是AbortPolicy,ExecutorService介面的系列ThreadPoolExecutor因為都沒有顯示的設定拒絕策略,所以預設的都是這個。

但請注意,ExecutorService中的執行緒池實例佇列都是無界的,也就是說把記憶體撐爆了都不會觸發拒絕策略。當自己自訂執行緒池實例時,使用這個策略一定要處理好觸發策略時拋的例外狀況,因為他會打斷目前的執行流程。

DiscardPolicy(丟棄策略)

public static class DiscardPolicy implements RejectedExecutionHandler {          public DiscardPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          }      }

功能:直接靜悄悄的丟棄這個任務,不觸發任何動作

#使用場景:如果你提交的任務無關緊要,你就可以使用它。因為它就是個空實現,會悄無聲息的吞噬你的任務。所以這個策略基本上不用了

DiscardOldestPolicy(棄老策略)

public static class DiscardOldestPolicy implements RejectedExecutionHandler {          public DiscardOldestPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  e.getQueue().poll();                  e.execute(r);              }          }      }

功能:如果執行緒池未關閉,就彈出佇列頭部的元素,然後嘗試執行

使用場景:這個策略還是會丟棄任務,丟棄時也是毫無聲息,但是特點是丟棄的是老的未執行的任務,而且是待執行優先順序較高的任務。

基於這個特性,我能想到的場景就是,發布消息,和修改消息,當消息發佈出去後,還未執行,此時更新的消息又來了,這個時候未執行的消息的版本比現在提交的訊息版本要低就可以被丟棄了。因為佇列中還有可能存在訊息版本更低的訊息會排隊執行,所以在真正處理訊息的時候一定要做好訊息的版本比較。

第三方實作的拒絕策略

dubbo中的线程拒绝策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {      protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);      private final String threadName;      private final URL url;      private static volatile long lastPrintTime = 0;      private static Semaphore guard = new Semaphore(1);      public AbortPolicyWithReport(String threadName, URL url) {          this.threadName = threadName;          this.url = url;      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          String msg = String.format("Thread pool is EXHAUSTED!" +                          " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +                          " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",                  threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),                  e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),                  url.getProtocol(), url.getIp(), url.getPort());          logger.warn(msg);          dumpJStack();          throw new RejectedExecutionException(msg);      }      private void dumpJStack() {         //省略实现      }  }

可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性

Netty中的线程池拒绝策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {          NewThreadRunsPolicy() {              super();          }          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {              try {                  final Thread t = new Thread(r, "Temporary task executor");                  t.start();              } catch (Throwable e) {                  throw new RejectedExecutionException(                          "Failed to start a new thread", e);              }          }      }

Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。

所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。

activeMq中的线程池拒绝策略

new RejectedExecutionHandler() {                  @Override                  public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {                      try {                          executor.getQueue().offer(r, 60, TimeUnit.SECONDS);                      } catch (InterruptedException e) {                          throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");                      }                      throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");                  }              });

activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常

pinpoint中的线程池拒绝策略

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {      private final RejectedExecutionHandler[] handlerChain;      public static RejectedExecutionHandler build(List<rejectedexecutionhandler> chain) {          Objects.requireNonNull(chain, "handlerChain must not be null");          RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);          return new RejectedExecutionHandlerChain(handlerChain);      }      private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {          this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {          for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {              rejectedExecutionHandler.rejectedExecution(r, executor);          }      }  }</rejectedexecutionhandler>

pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。

以上是Java執行緒池的拒絕策略是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除