Maison >Java >javaDidacticiel >Comment créer un pool de threads Java
permettent de réaliser la réutilisation des threads et d'éviter de recréer et de détruire des threads. Créer et détruire des threads coûte très cher au processeur.
Vous pouvez limiter le nombre maximum de threads pouvant être créés et ajuster dynamiquement les paramètres du pool de threads en fonction des performances de votre propre machine pour améliorer les performances des applications.
Fournit des fonctions telles que l'exécution planifiée et le contrôle de la concurrence.
Gestion unifiée des fils de discussion.
1 : Pool de threads de cache (non recommandé)
2 : Pool de threads à capacité fixe (non recommandé)
3 : Pool de threads uniques (non recommandé)
4 : Timing Pool de threads de tâches (non recommandé)
5 : Créer un pool de threads via la méthode de construction ThreadPoolExecutor (fortement recommandé par le manuel de développement Alibaba)
Les quatre premières façons de créer un pool de threads sont toutes créées via les méthodes statiques des exécuteurs.
ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int finalI = i; executorService.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"+ finalI); } }); }
Pourquoi le pool de threads en cache n'est-il pas recommandé ?
Analyse du code source
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
Grâce aux deux extraits de code ci-dessus, nous pouvons voir que la taille maximale de CachedThreadPool est la valeur maximale de l'entier 2147483647, ce qui équivaut à une création illimitée de threads, et la création de threads nécessite de la mémoire, ce qui entraînera un débordement de mémoire. . et les machines ordinaires n'utilisent pas une mémoire aussi grande pour créer un si grand nombre de threads.
newFixedThreadPool(int num), num est le nombre fixe de threads que nous voulons spécifier
ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { final int finalI = i; executorService.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"+ finalI); } }); }
Sortie :
pool-1-thread-575b3c38ff368dcafe01d2cfbd3d86ea2run>4
pool- 1-thread-475b3c38ff368dcafe01d2cfbd3d86ea2run>3
pool-1-thread-575b3c38ff368dcafe01d2cfbd3d86ea2run>5
pool-1-thread-375b3c38ff368dcafe01d2cfbd3d86ea2run>2
pool- 1- thread-375b3c38ff368dcafe01d2cfbd3d86ea2run>8
pool-1-thread-375b3c38ff368dcafe01d2cfbd3d86ea2run>9
pool-1-thread-275b3c38ff368dcafe01d2cfbd3d86ea2run>1
pool-1- thread- 175b3c38ff368dcafe01d2cfbd3d86ea2run>0
pool-1-thread-575b3c38ff368dcafe01d2cfbd3d86ea2run>7
pool-1-thread-475b3c38ff368dcafe01d2cfbd3d86ea2run>6
On peut le voir que les fils sont utilisés ou réutilisés.
Pourquoi FixedThreadPool est-il un pool de threads fixe ?
Analyse du code source
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
On peut voir à partir de ce code source que le nombre de threads principaux (corePoolSize) et le nombre maximum de threads (maximumPoolSize) sont tous deux des nThreads, car ce n'est qu'ainsi que le pool de threads ne sera pas élargi et le nombre de threads sera corrigé.
ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int finalI = i; executorService.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"+ finalI); } }); }
Pourquoi SingleThreadExecutor ne contient-il qu'un seul thread ?
Analyse du code source
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
On peut voir à partir de ce code source que le nombre de threads principaux (corePoolSize) et le nombre maximum de threads (maximumPoolSize) sont tous deux égaux à 1, il ne contient donc qu'un seul thread.
int initDelay=10; //初始化延时 int period=1;//初始化延迟过了之后,每秒的延时 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"); } },initDelay,period, TimeUnit.SECONDS);
L'effet de ce code est le suivant : attendez 10 secondes après l'exécution du programme, puis affichez le premier résultat, puis affichez le résultat toutes les 1 seconde.
Pourquoi ScheduledThreadPool n'est-il pas recommandé ?
Analyse du code source
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, 2147483647, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); }
On peut voir que le nombre maximum de threads (maximumPoolSize) de ScheduledThreadPool est la valeur maximale de l'entier 2147483647, ce qui équivaut à une création illimitée de threads, et la création de threads nécessite de la mémoire, ce qui entraînera un débordement de mémoire. , et généralement La machine n'utilise pas une mémoire aussi grande pour créer un si grand nombre de threads.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 12; i++) { final int finalI = i; threadPoolExecutor.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"+ finalI); } }); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize : le nombre de threads principaux. Une fois ces threads créés, ils ne seront pas détruits et existeront toujours. Le pool de threads n'a pas de threads par défaut. Lorsqu'une tâche arrive, un thread sera créé via ThreadFactory et existera toujours.
maximumPoolSize : nombre maximum de threads. Le nombre de threads non principaux = maximumPoolSize-corePoolSize Le nombre de threads non principaux est en fait le nombre de threads évolutifs et peut être détruit.
keepAliveTime : Le temps de survie inactif des threads non essentiels. Lorsque le nombre de threads non principaux générés par l'expansion est toujours inactif après le keepAliveTime, ces threads non principaux seront détruits.
unit : unité de temps de keepAliveTime, par exemple : secondes
workQueue : zone d'attente. Lorsqu'une tâche > corePoolSize arrive, la tâche sera stockée dans la file d'attente de blocage de workQueue, en attendant que d'autres threads soient traités.
threadFactory : Usine de fils. Une façon de créer des fils de discussion.
handler : stratégie de rejet. Lorsqu'il s'agit > du nombre maximum de threads + de la capacité de workQueue, la politique de rejet sera exécutée
ArrayBlockingQueue : file d'attente de blocage délimitée. La file d'attente a une limite de taille. Lorsque la capacité est dépassée, la politique d'expansion ou de rejet sera déclenchée.
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
LinkedBlockingQueue : file d'attente de blocage illimitée, la file d'attente n'a pas de limite de taille et peut provoquer un débordement de mémoire.
public LinkedBlockingQueue() { this(2147483647); }
AbortPolicy : lancer une exception directement
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy : aucune opération n'est effectuée. Supprimez silencieusement la tâche
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy : supprimez la tâche existante la plus longue
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
CallerRunsPolicy : laissez le fil de discussion qui a soumis la tâche traiter la tâche
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
threadFactory
ThreadFactory threadFactory = Executors.defaultThreadFactory(); threadFactory.newThread(new Runnable() { @Override public void run() { System.out.println("threadFactory"); } }).start();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 26; i++) { //并发数26 final int finalI = i; threadPoolExecutor.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+"<thread->run>"+ finalI); } }); } /** * 核心线程数=10,最大线程数=20,故可扩容线程数=20-10 * BlockingQueue的大小为5,故等待区的大小为5,也就是当并发数<=核心线程数+5不会扩容,并发数大于16才会扩容 * * 触发扩容:并发数>核心线程数+阻塞队列的大小 * 对于这段代码,如果来了26个并发,10个并发会被核心线程处理,5个会在等待区,剩下11个会因为等待区满了而触发扩容 * 因为这里最多能够扩容10个,这里却是11个,所以会触发拒绝策略 */
为什么这段代码会触发拒绝策略
对于这段代码,如果来了26个并发,10个并发会被核心线程处理,5个会在等待区,剩下11个会因为等待区满了而触发扩容,但是又因为因为这里最多能够扩容10个,这里却是11个,所以会触发拒绝策略。
怎么触发扩容
触发扩容:并发数>核心线程数(corePoolSize)+阻塞队列(workQueue)的大小
使用Java纯手写一个线程池
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!