Home  >  Article  >  Java  >  What is the rejection policy of Java thread pool

What is the rejection policy of Java thread pool

WBOY
WBOYforward
2023-05-14 08:13:05804browse

Pool Design Ideas

Pool design should not be a new term. Our common ones such as java thread pool, jdbc connection pool, redis connection pool, etc. are representative implementations of this type of design.

This design will initially preset resources, and the problem solved is to offset the consumption of each resource acquisition, such as the cost of creating threads, the cost of obtaining remote connections, etc. Just like when you go to the canteen to get food, the aunt who prepares the food will first put several portions of the rice there. When you come, you can just take the lunch box and add vegetables. You don't have to fill the rice and prepare vegetables at the same time, which is more efficient.

In addition to initializing resources, pooling design also includes the following features: the initial value of the pool, the active value of the pool, the maximum value of the pool, etc. These features can be directly mapped to members of the Java thread pool and database connection pool in properties.

The timing when the thread pool triggers the rejection policy

Different from the data source connection pool, in addition to the initial size and the maximum pool value, the thread pool also has an additional blocking queue. buffer.

The data source connection pool generally triggers a rejection policy when the number of requested connections exceeds the maximum value of the connection pool. The policy is generally to block and wait for the set time or directly throw an exception.

What is the rejection policy of Java thread pool

As shown in the figure, if you want to know when the thread pool triggers a rejection, you need to clarify the specific meaning of the above three parameters, which is the result of the overall coordination of these three parameters, and It is not simply that exceeding the maximum number of threads will trigger thread rejection. When the number of submitted tasks is greater than corePoolSize, it will be placed in the queue buffer first. Only after the buffer is filled, will it be judged whether the currently running task is greater than maxPoolSize. If it is less than the value, a new thread will be created for processing. When it is greater than the value, the rejection policy is triggered.

The summary is: when the current number of submitted tasks is greater than (maxPoolSize queueCapacity), the rejection policy of the thread pool will be triggered.

JDK has 4 built-in thread pool rejection strategies

Rejection strategy interface definition

In analyzing the thread pool that comes with JDK Before rejecting the policy, first take a look at the rejection policy interface defined by JDK, as follows:

public interface RejectedExecutionHandler {      void rejectedExecution(Runnable r, ThreadPoolExecutor executor);  }

The interface is very clearly defined. When the rejection policy is triggered, the thread pool will call the specific policy you set to transfer the currently submitted task. And the thread pool instance itself is passed to you for processing. Different scenarios will have different considerations for how to handle it. Let’s see what implementations the JDK has built in for us:

CallerRunsPolicy (caller running policy)

public static class CallerRunsPolicy implements RejectedExecutionHandler {          public CallerRunsPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  r.run();              }          }      }

Function: When the rejection policy is triggered, as long as the thread pool is not closed, it will be processed by the current thread that submitted the task.

Usage scenarios: Generally used in scenarios where failure is not allowed, performance requirements are not high, and concurrency is small, because the thread pool will not be closed under normal circumstances, that is, submission The task will definitely be run, but since it is executed by the caller thread itself, when the task is submitted multiple times, it will block the execution of subsequent tasks, and the performance and efficiency will naturally be slow.

AbortPolicy (abort policy)

public static class AbortPolicy implements RejectedExecutionHandler {          public AbortPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              throw new RejectedExecutionException("Task " + r.toString() +                                                   " rejected from " +                                                   e.toString());          }      }

Function: When the rejection policy is triggered, an exception that refuses to execute is thrown directly, which means to abort the policy That is to interrupt the current execution process

Usage scenarios:There is no special scenario for this, but one thing is to handle the thrown exception correctly.

The default policy in ThreadPoolExecutor is AbortPolicy. The series of ThreadPoolExecutors of the ExecutorService interface have no explicit rejection policy, so the default policy is this.

But please note that the thread pool instance queue in ExecutorService is unbounded, which means that the rejection policy will not be triggered even if the memory is exhausted. When you customize a thread pool instance, you must handle the exception thrown when the strategy is triggered when using this strategy, because it will interrupt the current execution process.

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {          public DiscardPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          }      }

Function: Discard this task quietly without triggering any action

Usage scenarios: You can use it if the task you submitted is not important. Because it is an empty implementation and will swallow up your tasks silently. So this policy is basically no longer needed

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {          public DiscardOldestPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  e.getQueue().poll();                  e.execute(r);              }          }      }

Function:If the thread pool is not closed, pop up the queue head element inside, and then try to execute

Usage scenario: This strategy will still discard the task, and it will be silent when discarding, but the characteristic is that it discards old unexecuted tasks, and It is a task with a higher priority to be executed.

Based on this feature, the scenario I can think of is to publish a message and modify a message. After the message is published, it has not been executed. At this time, the updated message comes again. At this time, the unexecuted message If the version is lower than the currently submitted message version, it can be discarded. Because there may be messages with lower message versions in the queue that will be queued for execution, you must compare the message versions when actually processing the message.

Rejection strategy implemented by third party

dubbo中的线程拒绝策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {      protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);      private final String threadName;      private final URL url;      private static volatile long lastPrintTime = 0;      private static Semaphore guard = new Semaphore(1);      public AbortPolicyWithReport(String threadName, URL url) {          this.threadName = threadName;          this.url = url;      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          String msg = String.format("Thread pool is EXHAUSTED!" +                          " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +                          " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",                  threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),                  e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),                  url.getProtocol(), url.getIp(), url.getPort());          logger.warn(msg);          dumpJStack();          throw new RejectedExecutionException(msg);      }      private void dumpJStack() {         //省略实现      }  }

可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性

Netty中的线程池拒绝策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {          NewThreadRunsPolicy() {              super();          }          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {              try {                  final Thread t = new Thread(r, "Temporary task executor");                  t.start();              } catch (Throwable e) {                  throw new RejectedExecutionException(                          "Failed to start a new thread", e);              }          }      }

Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。

所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。

activeMq中的线程池拒绝策略

new RejectedExecutionHandler() {                  @Override                  public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {                      try {                          executor.getQueue().offer(r, 60, TimeUnit.SECONDS);                      } catch (InterruptedException e) {                          throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");                      }                      throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");                  }              });

activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常

pinpoint中的线程池拒绝策略

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {      private final RejectedExecutionHandler[] handlerChain;      public static RejectedExecutionHandler build(List<rejectedexecutionhandler> chain) {          Objects.requireNonNull(chain, "handlerChain must not be null");          RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);          return new RejectedExecutionHandlerChain(handlerChain);      }      private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {          this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {          for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {              rejectedExecutionHandler.rejectedExecution(r, executor);          }      }  }</rejectedexecutionhandler>

pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。

The above is the detailed content of What is the rejection policy of Java thread pool. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete