Maison >Java >javaDidacticiel >Pool de threads simultanés Java : explication détaillée de ThreadPoolExecutor
Résumé : La caractéristique du pool de threads est qu'après le nombre de threads = corePoolSize, seulement lorsque la file d'attente des tâches est pleine, une tâche sera retirée de la file d'attente des tâches, puis un nouveau thread sera construit, et le cycle continuera jusqu'à ce que le nombre de threads atteigne maximumPoolSize et que la politique de rejet soit exécutée.
L'idée du pool de threads est d'ouvrir une zone dans le système pour stocker certains threads en attente. Cette zone est appelée un. pool de threads. Si une tâche doit être exécutée, un thread en veille est emprunté au pool de threads pour exécuter la tâche spécifiée, et le thread emprunté peut être renvoyé lorsque la tâche est terminée. Cela évite de créer à plusieurs reprises un grand nombre d’objets thread et de gaspiller les ressources CPU et mémoire.
Si vous observez l'implémentation du code source de divers pools de threads fournis par jdk, vous pouvez constater que, à l'exception du nouveau pool de threads newWorkStealingPool ajouté par jdk8 , ils sont tous basés sur l'implémentation d'encapsulation de ThreadPoolExecutor, alors expliquez d'abord les fonctions spécifiques de ThreadPoolExecutor.
ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize : Spécifiez le nombre de threads dans le pool de threads
maximumPoolSize : Nombre maximum de threads
keepAliveTime : Lorsque le nombre de threads dépasse corePoolSize, le temps de survie du thread inactif dépasse (passé ce délai, le thread inactif sera détruit).
unit : unité de temps de keepAliveTime
workQueue : file d'attente des tâches, tâches soumises mais non exécutées
threadFactory : fabrique de threads pour créer des threads, la valeur par défaut est
handler : politique de rejet, comment rejeter des tâches lorsqu'il y a trop de tâches à traiter, la valeur par défaut est la nouvelle stratégie AbortPolicy().
ExecutorService es = new ThreadPoolExecutor(3, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("discard"); } });
Résumé : les caractéristiques du pool de threads sont celles de le thread Après le nombre = corePoolSize, seulement lorsque la file d'attente des tâches est pleine, une tâche sera retirée de la file d'attente des tâches, puis un nouveau thread sera construit et le cycle continuera jusqu'à ce que le nombre de threads atteigne maximumPoolSize à exécuter la politique de rejet.
Tant que la file d'attente implémente l'interface BlockingQueue, notez que l'interface de file d'attente de niveau supérieur implémentée par ConcurrentLinkedQueue ne peut pas être utilisée ici.
Les plus couramment utilisés sont les suivants :
SynchronousQueue : Soumettre directement la file d'attente La file d'attente n'a pas de capacité. Chaque opération d'insertion doit attendre une suppression correspondante. Au contraire, chaque opération de suppression doit attendre l’opération d’insertion correspondante. Par conséquent, il n'enregistre pas les tâches et soumet toujours les tâches aux threads pour exécution. S'il n'y a pas de threads inactifs, un nouveau thread est créé lorsque le nombre de threads atteint le maximum, une politique de rejet est exécutée.
ArrayBlockingQueue : File d'attente de tâches limitée. Si le nombre de threads dans le pool de threads est inférieur à corePoolSize, de nouveaux threads seront créés. S'il est supérieur à corePoolSize, de nouvelles tâches seront ajoutées. à la file d'attente. Si la file d'attente est pleine, un nouveau thread sera créé pour exécuter la tâche si le thread total n'est pas supérieur à maximumPoolSize. S'il est supérieur à maximumPoolSize, une politique de rejet sera exécutée.
LinkedBlockingQueue : File d'attente illimitée, à moins que les ressources système ne soient épuisées, il n'y aura aucun échec dans la mise en file d'attente de la tâche. Si le nombre de threads dans le pool de threads est inférieur à corePoolSize, un nouveau thread sera créé ; s'il est supérieur à corePoolSize, une nouvelle tâche sera ajoutée à la file d'attente.
PriortyBlockingQueue : La file d'attente des tâches prioritaires, qui peut contrôler l'ordre d'exécution des tâches, est une file d'attente illimitée. ArrayBlockingQueue et LinkedBlockingQueue traitent tous deux les tâches selon l'algorithme premier entré, premier sorti, PriorityBlockingQueue peuvent être exécutées séquentiellement en fonction de la priorité des tâches elles-mêmes.
Les threads du pool de threads sont épuisés et les tâches dans la file d'attente sont pleines. Aucune nouvelle tâche ne peut plus être remplie, une stratégie de rejet est donc nécessaire. : Tâches de traitement Que faire lorsque la quantité dépasse la capacité réelle du système.
JDK a quatre politiques de rejet intégrées :
AbortPolicy : lancer une exception directement (politique par défaut) Même si le pool de threads est libre, les threads suivants ne peuvent pas s'exécuter. Oui, si vous souhaitez que les threads suivants s'exécutent, vous devez capturer les informations sur les exceptions.
CallerRunsPolicy : cette stratégie exécute la tâche actuellement ignorée directement dans le thread appelant. Évidemment, cela ne supprimera pas la tâche, mais les performances du fil de soumission de la tâche chuteront très probablement fortement.
DiscardOldestPolicy : supprimera la requête la plus ancienne, qui est une tâche sur le point d'être exécutée, et tentera de soumettre à nouveau la tâche en cours.
DiscardPolicy : supprimez silencieusement les tâches non traitables sans aucun traitement. C’est probablement la meilleure solution si des tâches risquent d’être perdues. Lorsque le pool de threads n'est pas inactif, les tâches soumises seront ignorées et les tâches soumises seront exécutées lorsqu'il y aura des threads inactifs.
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 直接在调用者线程中运行当前被丢弃的任务 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。
public class MyTask implements Runnable { private int number; public MyTask(int number) { super(); this.number = number; } public void run() { System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) {// ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i=0;i<10000;i++) { try { System.out.println(i); es.submit(new MyTask(i)); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); System.out.println("------------------------"+i); } } }
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 自动调用addWorker检查runState和workerCount, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *如果任务可以成功排队,那么我们仍然需要 仔细检查我们是否应该添加一个线程 (因为现有的自从上次检查后死亡)或者那个 自进入该方法以来,该池关闭。 所以我们 重新检查状态,如果有必要的话回滚队列 停止,或者如果没有的话就开始一个新的线程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);//队列满了,执行拒绝策略 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法 } /** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。
ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。
ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。
ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
相当于newScheduledThreadPool(int corePoolSize)中corePoolSize设置为1。
ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();
延迟线程池
class MyScheduledTask implements Runnable { private String tname; public MyScheduledTask(String tname) { this.tname=tname; } public void run() { System.out.println(tname+"任务时延2秒执行!!!"); } }public class intsmaze { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool =Executors.newScheduledThreadPool(2); MyScheduledTask mt1=new MyScheduledTask("MT1"); scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS); } }
newWorkStealingPool java8新增连接池-intsmaze
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争 public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。
希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。
用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:
Ncpu=CPU的数量
Ucpu=目标CPU的使用率,0<=Ucpu<=1
W/C=等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的线程池的大小等于:
Nthreads=Ncpu*Ucpu*(1+W/C)
在java中,可以通过
Runtime.getRuntime().availableProcessors()
取得可以CPU数量。
相关推荐:
ThreadPoolExecutor线程池之submit方法
JAVA中ThreadPoolExecutor线程池的submit方法详解
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!