Maison  >  Article  >  Java  >  Quelle est la politique de rejet du pool de threads Java

Quelle est la politique de rejet du pool de threads Java

WBOY
WBOYavant
2023-05-14 08:13:05805parcourir

Idée de conception de piscine

La conception de piscine ne devrait pas être un nouveau terme. Nos plus courants tels que le pool de threads Java, le pool de connexions jdbc, le pool de connexions Redis, etc. sont des implémentations représentatives de ce type de conception.

Cette conception prédéfinira initialement les ressources, et le problème résolu est de compenser la consommation de chaque acquisition de ressources, comme le coût de création de threads, le coût d'obtention de connexions à distance, etc. Tout comme lorsque vous allez à la cantine chercher à manger, la tante qui prépare la nourriture y mettra d'abord plusieurs portions de riz. Lorsque vous viendrez, vous pourrez simplement prendre la boîte à lunch et y ajouter des légumes. le riz et préparer les légumes en même temps, ce qui est plus efficace.

En plus d'initialiser les ressources, la conception de pooling comprend également les fonctionnalités suivantes : la valeur initiale du pool, la valeur active du pool, la valeur maximale du pool, etc. Ces fonctionnalités peuvent être directement mappées aux attributs des membres de le pool de threads Java et le pool de connexions à la base de données.

Le moment auquel le pool de threads déclenche la politique de rejet

est différent du pool de connexions à la source de données. En plus de la taille initiale et de la valeur maximale du pool, le pool de threads dispose également d'une file d'attente de blocage supplémentaire pour la mise en mémoire tampon. .

Le pool de connexions à la source de données déclenche généralement une stratégie de rejet lorsque le nombre de connexions demandées dépasse la valeur maximale du pool de connexions. La stratégie consiste généralement à bloquer et à attendre le temps défini ou à lancer directement une exception.

Quelle est la politique de rejet du pool de threads Java

Comme le montre la figure, si vous souhaitez savoir quand le pool de threads déclenche le rejet, vous devez clarifier la signification spécifique des trois paramètres ci-dessus. C'est plutôt le résultat de la coordination globale de ces trois paramètres. que de simplement déclencher lorsque le nombre maximum de threads est dépassé. Le rejet du thread est approximatif. Lorsque le nombre de tâches soumises est supérieur à corePoolSize, il sera d'abord placé dans le tampon de file d'attente. Ce n'est qu'une fois le tampon rempli qu'il sera jugé. la tâche en cours d'exécution est supérieure à maxPoolSize Si elle est inférieure à maxPoolSize, un nouveau thread sera créé pour le traitement. Lorsqu'elle est supérieure à la valeur, la politique de rejet est déclenchée.

Le résumé est le suivant : lorsque le nombre actuel de tâches soumises est supérieur à (maxPoolSize + queueCapacity), la politique de rejet du pool de threads sera déclenchée.

JDK dispose de 4 stratégies de rejet de pool de threads intégrées

Définition de l'interface de la stratégie de rejet

Avant d'analyser la stratégie de rejet du pool de threads fournie avec JDK, jetez d'abord un œil à l'interface de stratégie de rejet définie par JDK, comme suit :

public interface RejectedExecutionHandler {      void rejectedExecution(Runnable r, ThreadPoolExecutor executor);  }

Interface La définition est très claire. Lorsque la politique de rejet est déclenchée, le pool de threads appellera la politique spécifique que vous avez définie et vous transmettra la tâche actuellement soumise et l'instance du pool de threads elle-même pour traitement. différentes considérations sur la façon de le gérer. Voici les implémentations intégrées au JDK pour nous :

CallerRunsPolicy (politique d'exécution de l'appelant)

public static class CallerRunsPolicy implements RejectedExecutionHandler {          public CallerRunsPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  r.run();              }          }      }

Fonction : Lorsque la politique de rejet est déclenchée, tant que la politique de rejet est déclenchée. le pool de threads n'est pas fermé, il sera traité par le thread actuel qui a soumis la tâche.

Scénarios d'utilisation : Généralement utilisé dans des scénarios où l'échec n'est pas autorisé, les exigences de performances ne sont pas élevées et la concurrence est faible, car le pool de threads ne sera pas fermé dans des circonstances normales, c'est-à-dire que les tâches soumises seront définitivement exécutées , mais comme il est exécuté par le thread appelant lui-même, lorsqu'une tâche est soumise plusieurs fois, elle bloquera l'exécution des tâches suivantes, et les performances et l'efficacité seront naturellement lentes.

AbortPolicy (politique d'abandon)

public static class AbortPolicy implements RejectedExecutionHandler {          public AbortPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              throw new RejectedExecutionException("Task " + r.toString() +                                                   " rejected from " +                                                   e.toString());          }      }

Fonction : Lorsque la politique de rejet est déclenchée, une exception qui refuse de s'exécuter est levée directement. La signification de la politique d'abandon est d'interrompre le processus d'exécution en cours

Scénario d'utilisation. : Ça y est. Il n'y a pas de scénarios particuliers, mais une chose est de gérer correctement les exceptions levées.

La politique par défaut dans ThreadPoolExecutor est AbortPolicy. La série de ThreadPoolExecutors de l'interface ExecutorService n'a pas de politique de rejet explicite, la politique par défaut est donc la suivante.

Mais veuillez noter que la file d'attente des instances du pool de threads dans ExecutorService est illimitée, ce qui signifie que la politique de rejet ne sera pas déclenchée même si la mémoire est pleine. Lorsque vous personnalisez une instance de pool de threads, vous devez gérer l'exception levée lorsque la stratégie est déclenchée lors de l'utilisation de cette stratégie, car elle interrompra le processus d'exécution en cours.

DiscardPolicy (politique de suppression)

public static class DiscardPolicy implements RejectedExecutionHandler {          public DiscardPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          }      }

Fonction : Abandonnez cette tâche directement et silencieusement, sans déclencher aucune action

Scénario d'utilisation : Si la tâche que vous avez soumise n'est pas importante, vous pouvez l'utiliser. Parce qu'il s'agit d'une implémentation vide et qu'elle engloutira vos tâches en silence. Cette stratégie n'est donc fondamentalement plus nécessaire

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {          public DiscardOldestPolicy() { }          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!e.isShutdown()) {                  e.getQueue().poll();                  e.execute(r);              }          }      }

Fonction :Si le pool de threads n'est pas fermé, affichez l'élément en tête de la file d'attente, puis essayez d'exécuter

Scénario d'utilisation : Cette politique Les tâches seront toujours supprimées et il n'y aura aucun son lors de leur suppression, mais la caractéristique est que les anciennes tâches non exécutées sont supprimées et ce sont des tâches avec une priorité plus élevée à exécuter.

Sur la base de cette fonctionnalité, le scénario auquel je peux penser est de publier un message et de modifier un message. Une fois le message publié, il n'a pas encore été exécuté. À ce moment, le message mis à jour revient. la version du message non exécuté est plus ancienne que celle soumise actuellement. Si la version du message est inférieure, elle peut être ignorée. Étant donné qu'il peut y avoir des messages avec des versions inférieures dans la file d'attente qui seront mis en file d'attente pour exécution, vous devez comparer les versions de message lors du traitement réel du message.

Stratégie de rejet mise en œuvre par un tiers

dubbo中的线程拒绝策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {      protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);      private final String threadName;      private final URL url;      private static volatile long lastPrintTime = 0;      private static Semaphore guard = new Semaphore(1);      public AbortPolicyWithReport(String threadName, URL url) {          this.threadName = threadName;          this.url = url;      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {          String msg = String.format("Thread pool is EXHAUSTED!" +                          " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +                          " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",                  threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),                  e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),                  url.getProtocol(), url.getIp(), url.getPort());          logger.warn(msg);          dumpJStack();          throw new RejectedExecutionException(msg);      }      private void dumpJStack() {         //省略实现      }  }

可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性

Netty中的线程池拒绝策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {          NewThreadRunsPolicy() {              super();          }          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {              try {                  final Thread t = new Thread(r, "Temporary task executor");                  t.start();              } catch (Throwable e) {                  throw new RejectedExecutionException(                          "Failed to start a new thread", e);              }          }      }

Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。

所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。

activeMq中的线程池拒绝策略

new RejectedExecutionHandler() {                  @Override                  public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {                      try {                          executor.getQueue().offer(r, 60, TimeUnit.SECONDS);                      } catch (InterruptedException e) {                          throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");                      }                      throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");                  }              });

activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常

pinpoint中的线程池拒绝策略

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {      private final RejectedExecutionHandler[] handlerChain;      public static RejectedExecutionHandler build(List<rejectedexecutionhandler> chain) {          Objects.requireNonNull(chain, "handlerChain must not be null");          RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);          return new RejectedExecutionHandlerChain(handlerChain);      }      private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {          this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");      }      @Override      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {          for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {              rejectedExecutionHandler.rejectedExecution(r, executor);          }      }  }</rejectedexecutionhandler>

pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer