Maison  >  Article  >  Java  >  Explication détaillée du code graphique du pool de threads en Java

Explication détaillée du code graphique du pool de threads en Java

黄舟
黄舟original
2017-09-25 10:56:191979parcourir

Le pool de threads est un point de connaissance important en Java. J'ai lu de nombreux articles ici, je vais prendre le pool de threads fourni avec Java comme exemple pour l'enregistrer et l'analyser. Cet article fait référence à la programmation simultanée Java : utilisation de pools de threads, pool de threads Java --- analyse de la méthode addWorker, pools de threads, sélection de stratégies dans ThreadPoolExecutor et sélection de files d'attente de travail (pool de threads Java) et analyse du pool de threads ThreadPoolExecutor et trois types de BlockingQueue accomplissent. Cet article implémente des cas d'utilisation et une analyse du code source basés sur JDK1.8, et se concentre principalement sur le processus.

1. Utiliser le pool de threads

Pour connaître le principe de quelque chose, il faut d'abord savoir l'utiliser. Commençons donc par un exemple d’utilisation d’un pool de threads.

1. Classe de tâches

Pour utiliser le pool de threads fourni avec Java, vous avez d'abord besoin d'une classe de tâches. Cette classe de tâches doit implémenter l'interface Runnable et remplacer la méthode run (ce qui nécessite. exécution multithread) logique de tâche).

package org.my.threadPoolDemo;
/**
 * 任务类,实现Runnable接口 重写run方法
 */
public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int taskNum) {
        super();
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        System.out.println("正在执行task"+taskNum);
        try {
            Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task"+taskNum+"执行完毕");
    }
}

2. Créez un pool de threads et exécutez plusieurs tâches

Avec la classe de tâches, créez ensuite un pool de threads et exécutez plusieurs tâches. Nous utilisons ThreadPoolExecutor pour créer un pool de threads.

package org.my.threadPoolDemo;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUseDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

}

La logique est très simple, créez un gestionnaire de pool de threads, puis utilisez-le pour exécuter 15 tâches. Ici, nous devons expliquer la signification des paramètres transmis lors de la construction de ThreadPoolExecutor :

5 (corePoolSize) fait référence à la taille du pool principal. C'est-à-dire le nombre de threads créés. Si le nombre de threads dans le pool de threads est égal à ce nombre, alors la prochaine tâche à venir sera placée dans la file d'attente des tâches (illustré en détail plus loin).

10 (maximumPoolSize) fait référence au nombre maximum de threads que le pool de threads peut créer. Si la file d'attente des tâches de l'étape précédente est pleine, le pool de threads continuera à créer des threads jusqu'à ce que le nombre de threads = 10, et l'exécution de la tâche suivante sera refusée.

200 (keepAliveTime) fait référence au temps maximum que le thread peut durer avant de se terminer lorsqu'il n'y a aucune tâche à exécuter. Par défaut, keepAliveTime ne fonctionnera que lorsque le nombre de threads dans le pool de threads est supérieur à corePoolSize et jusqu'à ce que le nombre de threads dans le pool de threads ne soit pas supérieur à corePoolSize, c'est-à-dire lorsque le nombre de threads dans le pool de threads est supérieur à corePoolSize, si un thread est inactif. Lorsque le temps atteint keepAliveTime, il se terminera jusqu'à ce que le nombre de threads dans le pool de threads ne dépasse pas corePoolSize. Mais si la méthode allowCoreThreadTimeOut(boolean) est appelée, lorsque le nombre de threads dans le pool de threads n'est pas supérieur à corePoolSize, le paramètre keepAliveTime fonctionnera également jusqu'à ce que le nombre de threads dans le pool de threads soit égal à 0.

TimeUnit.MILLISECONDS est l'unité de temps de keepAliveTime.

TimeUnit.DAYS; ; t.MILLISECONDS; //Milliseconds
TimeUnit.MICROSECONS; //Microsecondes
TimeUnit.NANOSECONDS; //Nanoseconds


ArrayBlockingQueue est file d'attente de blocage entrante utilisée pour stocker la file d'attente des tâches workQueue. C'est-à-dire la file d'attente des tâches mentionnée ci-dessus. En plus d'ArrayBlockingQueue, vous avez également le choix entre LinkedBlockingQueue et SynchronousQueue.

ThreadPoolExecutor a quatre constructeurs En plus des paramètres passés ci-dessus, il existe d'autres constructeurs qui peuvent être passés dans :

threadFactory : Usine de threads, principalement utilisée pour créer des threads

handler : indique la politique de refus d'exécution des tâches. Il existe quatre valeurs :

ThreadPoolExecutor.AbortPolicy : supprime la tâche et lève RejectedExecutionException.

ThreadPoolExecutor.DiscardPolicy : supprime également les tâches, mais ne lève pas d'exceptions.

ThreadPoolExecutor.DiscardOldestPolicy : supprimez la tâche la plus en avant de la file d'attente, puis essayez à nouveau d'exécuter la tâche (répétez ce processus)

ThreadPoolExecutor.CallerRunsPolicy : la tâche est traitée par le thread appelant


Execute le programme ci-dessus, les résultats sont les suivants :

Vous pouvez voir que puisque corePoolSize est 5, lorsque le nombre de tâches est supérieur à 5, la tâche suivante est placé dans la file d'attente des tâches pour attendre, mais comme la capacité maximale de la file d'attente des tâches est de 5 et maximumPoolSize = 10, une fois la file d'attente des tâches pleine, le gestionnaire de pool de threads continue de créer 5 threads, et enfin le nombre de threads dans le le pool de threads atteint 10. À l'heure actuelle, 15 tâches peuvent être traitées par ces threads. Si d'autres tâches sont ajoutées, par exemple en augmentant le nombre de boucles for à 20, une exception java.util.concurrent.RejectedExecutionException se produira.

2. Analyse des principes
正在执行task0
线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0
线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task1
线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task2
线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task3
线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task4
线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task10
线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task11
线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task12
正在执行task13
线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task14
task1执行完毕
task10执行完毕
正在执行task5
task11执行完毕
task13执行完毕
task4执行完毕
task3执行完毕
task2执行完毕
task0执行完毕
正在执行task9
正在执行task8
正在执行task7
task14执行完毕
task12执行完毕
正在执行task6
task5执行完毕
task9执行完毕
task7执行完毕
task8执行完毕
task6执行完毕

À en juger par l'exemple ci-dessus d'utilisation du pool de threads, les deux étapes les plus importantes consistent à construire l'objet ThreadPoolExecutor, puis à appeler la méthode d'exécution de l'objet ThreadPoolExecutor chaque le moment où une tâche arrive.

1. Structure de ThreadPoolExecutor

La structure principale et la relation d'héritage de ThreadPoolExecutor sont présentées dans la figure ci-dessous :

ThreadPoolExecutor structure et relation d'héritageExplication détaillée du code graphique du pool de threads en Java

主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。

了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:

Explication détaillée du code graphique du pool de threads en Java

Explication détaillée du code graphique du pool de threads en Java

2、ThreadPoolExecutor构造器及重要常量

简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。

在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:


//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量
//初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位为111
private static final int RUNNING    = -1 << COUNT_BITS;
//最高三位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位为001
private static final int STOP       =  1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位为010
private static final int TIDYING    =  2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位为011
private static final int TERMINATED =  3 << COUNT_BITS;
/**
* 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位
* 而这最高三位如上文所述,表示线程池的状态
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/**
* 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,用于存放待执行任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/** 判断线程池是否是运行状态,传入的参数是ctl的值
* 只有RUNNING的符号位是1,也就是只有RUNNING为负数
* 所以如果目前的ctl值<0,就是RUNNING状态
*/
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//从任务队列中移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

3、getTask()源代码

通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。

private Runnable getTask() {
        //记录循环体中上个循环在从阻塞队列中取任务时是否超时
        boolean timedOut = false; 
        //无条件循环,主要是在线程池运行正常情况下
        //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一
             *这时候不管队列中有没有任务,都不用去执行了;
             *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了
             *将工作线程数量-1,并且返回null
             **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取工作线程数量
            int wc = workerCountOf(c);

            //是否启用超时参数keepAliveTime
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。
            //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态
            //那么下一个循环,将会return null并且线程数量-1
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

4、runWorker(Worker w)源代码

通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//获取传入的Worker对象中的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                /* 
                 * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体
                 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态,
                //也不允许执行任务,还需要中断该线程!
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//为子类预留的空方法(钩子)
                    Throwable thrown = null;
                    try {
                        task.run();//终于真正执行传入的任务了
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//为子类预留的空方法(钩子)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
                /*
                 * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断
                 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。
                 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。
                 * 直接自己去任务队列拿任务
                 */
            }
            completedAbruptly = false;
        } finally {
                /*
                 * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了
                 * 当前线程将被回收了
                 */
            processWorkerExit(w, completedAbruptly);
        }
    }

5、addWorker(Runnable firstTask, boolean core)源代码

runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。

private boolean addWorker(Runnable firstTask, boolean core) {
        //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池运行状态

            /*
             * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false
             * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//获取工作线程数量
                /*
                 * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量
                 * true,代表当前线程数量小于核心池数量
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                        //如果成功,跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判断线程池状态,改变了就重试一次
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                /*
                 * 前面顺利增加了工作线程数,那么这里就真正创建Worker
                 * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败
                 * finally中会将工作线程数-1
                 */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将Worker放入工作线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

6、execute方法

execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

方法execute主要是在控制上面四条策略的实现。


 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取到成员变量ctl的值(线程池状态)
        int c = ctl.get();
        //如果工作线程的数量<核心池的大小
        if (workerCountOf(c) < corePoolSize) {
            //调用addWorker(这里传入的true代表工作线程数量<核心池大小)
            //如果成功,方法结束。
            if (addWorker(command, true))
                return;
            //否则,再重新获取一次ctl的值
            //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。
            c = ctl.get();
        }
        //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面
        //如果线程池处于运行状态,并且成功将当前任务放入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            //为了线程安全,重新获取ctl的值
            int recheck = ctl.get();
            //如果线程池不处于运行状态并且任务从任务队列移除成功
            if (! isRunning(recheck) && remove(command))
                //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作
                reject(command);
            //否则,如果工作线程数量==0,调用addWorker并传入null和false
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false)
        //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。
        //如果还是提交任务失败则调用reject处理失败任务
        else if (!addWorker(command, false))
            reject(command);
    }

三、线程池相关影响因素

1、阻塞队列的选择

本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)

直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

Files d'attente limitées :
Les files d'attente limitées (comme ArrayBlockingQueue) aident à prévenir l'épuisement des ressources lors de l'utilisation de maximumPoolSizes limitées, mais peuvent être plus difficiles à régler et à contrôler. La taille de la file d'attente et la taille maximale du pool peuvent nécessiter un compromis : l'utilisation de grandes files d'attente et de petits pools peut minimiser l'utilisation du processeur, les ressources du système d'exploitation et la surcharge de changement de contexte, mais peut entraîner une réduction artificielle du débit. Si les tâches se bloquent fréquemment (par exemple, s'il s'agit de limites d'E/S), le système peut planifier plus de threads que vous n'en autorisez. L'utilisation de petites files d'attente nécessite généralement des pools de plus grande taille et une utilisation plus importante du processeur, mais peut entraîner une surcharge de planification inacceptable, ce qui peut également réduire le débit.

2. Pools de threads couramment utilisés

Lorsque vous utilisez ThreadPoolExecutor pour créer un pool de threads, vous pouvez utiliser différents constructeurs pour construire des pools de threads avec des caractéristiques différentes en fonction de différents paramètres. Dans les situations réelles, nous utilisons généralement les méthodes fournies par la classe Executors pour créer des pools de threads. Les exécuteurs appellent finalement le constructeur de ThreadPoolExecutor, mais les paramètres pertinents ont été configurés.

1) Pool de threads de taille fixe : Executors.newFixedThreadPool

coresize est le même que maxsize, le délai d'attente est de 0 et la file d'attente utilise une file d'attente FIFO illimitée de LinkedBlockingQueue, ce qui signifie que ce thread le pool n'a toujours que des threads coresize en cours d'exécution. Selon l'analyse précédente, si la tâche est exécutée, ce thread continuera à récupérer les tâches de la file d'attente des tâches pour exécution. Lorsqu'il n'y a aucune tâche, le thread sera immédiatement fermé.

2) Pool de threads à tâche unique : newSingleThreadExecutor

Il n'y a qu'un seul thread travaillant dans le pool et la file d'attente de blocage est illimitée. Elle peut garantir que les tâches sont exécutées dans l'ordre dans lequel. ils sont soumis.

3) Pool de threads de taille variable : newCachedThreadPool

File d'attente SynchronousQueue, une file d'attente de blocage qui ne stocke pas d'éléments. Chaque opération d'insertion doit attendre qu'un autre thread appelle une opération de suppression. Par conséquent, lorsque nous soumettons la première tâche, nous ne pouvons pas rejoindre la file d'attente. Cela satisfait une condition du pool de threads "Lorsque nous ne pouvons pas rejoindre la file d'attente et que la tâche n'atteint pas la taille maximale, nous démarrerons une nouvelle tâche de thread." Notre taille maximale est donc la valeur maximale des 29 bits inférieurs de ctl. Le délai d'attente est de 60 s. Lorsqu'un thread n'a aucune tâche à exécuter, le délai d'attente de 60 s sera temporairement enregistré. S'il n'y a pas de nouvelles tâches, il sera supprimé du pool de threads.

4) pool de threads planifié, newScheduledThreadPool

crée un pool de threads de taille illimitée. Ce pool de threads prend en charge le besoin de timing et d'exécution périodique des tâches

Enfin, c'est presque terminé, j'ai l'impression qu'il y a encore beaucoup de choses qui n'ont pas été détaillées, notamment le grand nombre de verrous réentrants. Je me rattraperai quand j'en aurai l'occasion. J'ai intégré de nombreuses ressources en ligne et dessiné moi-même quelques images. Je pense personnellement que c'est plus facile à comprendre que de nombreux blogs qui introduisent des pools de threads. S'il y a des erreurs, les critiques et corrections sont les bienvenues.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn