Maison  >  Article  >  Java  >  Comment utiliser la méthode exécutée () du pool de threads Java

Comment utiliser la méthode exécutée () du pool de threads Java

WBOY
WBOYavant
2023-05-10 23:46:122068parcourir

Comprenez d'abord le rôle du pool de threads

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

Le pool de threads gère deux problèmes différents. Le pool de threads donne de meilleures performances à un grand nombre de tâches asynchrones en réduisant la surcharge avant que le thread ne soit officiellement appelé. fournit une série de moyens de lier les threads de tâches de gestion. Chaque pool de threads contient des informations de base, telles que le nombre de tâches terminées en interne.

Premier coup d'oeil à une série de

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl de la classe ThreadPoolExecutor qui représente l'état. Comme la classe AtomicInteger, elle stocke deux types d'informations dans la classe Parmi eux, les 3 bits supérieurs enregistrent l'état du pool de threads. , et les 29 derniers bits enregistrent le pool de threads à ce moment-là. Le nombre de threads de classe Woker dans le pool de threads (on peut voir que le nombre maximum acceptable de threads dans le pool de threads est d'environ 500 millions). On peut voir que les méthodes runStateOf() et workerCountOf() fournies fournissent respectivement des méthodes pour afficher l'état et le nombre de threads.

Cette classe fournit un total de cinq états

Regardons les commentaires donnés par l'auteur

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don&#39;t accept new tasks, but process queued tasks
*   STOP:     Don&#39;t accept new tasks, don&#39;t process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • L'état RUNNING peut accepter de nouvelles tâches entrantes et également exécuter des tâches dans la file d'attente. RUNNING状态可以接受新进来的任务,同时也会执行队列里的任务。

  • SHUTDOWN 状态已经不会再接受新任务,但仍旧会处理队列中的任务。

  • STOP状态在之前的基础上,不会处理队列中的人物,在执行的任务也会直接被打断。

  • TIDYING状态在之前的基础上,所有任务都已经终止,池中的Worker线程都已经为0,也就是stop状态在清理完所有工作线程之后就会进入该状态,同时在shutdown状态在队列空以及工作线程清理完毕之后也会直接进入这个阶段,这一阶段会循环执行terminated()方法。

  • TERMINATED 状态作为最后的状态,在之前的基础上terminated()方法也业已执行完毕,才会从上个状态进入这个状态,代表线程池已经完全停止。

由于线程池的状态都是通过AtomicInteger来保存的,可以通过比较的方式简单的得到当前线程状态。

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

接下来是线程池的几个有关工作线程的变量

  • corePoolSize表示线程池中允许存活最少的工作线程数量,但值得注意的是如果allowCoreThreadTimeOut一旦设置true(默认false),每个线程的存活时间只有keepAliveTime也就是说在allowCoreThreadTimeOut为true的时候,该线程池最小的工作线程数量为0;maximumPoolSize代表线程池中最大的工作线程数量。

  • keepAliveTime为线程池中工作线程数量大于corePoolSize时,每个工作线程的在等待工作时最长的等待时间。

  • workQueue作为线程池的任务等待队列,这个将在接下来的execute()里详细解释。

  • Workers作为存放线程池中存放工作线程的容器。

  • largestPoolSize用来记录线程池中存在过的最大的工作线程数量。

  • completedTaskCount用来记录线程池完成的任务的总数。

  • Handler

Le statut SHUTDOWN n'acceptera plus de nouvelles tâches, mais les tâches en file d'attente seront toujours traitées.

Le statut STOP ne traitera pas les caractères dans la file d'attente et les tâches en cours d'exécution seront directement interrompues.

L'état TIDYING est basé sur le précédent. Toutes les tâches ont été terminées et les threads de travail du pool ont tous été 0. C'est-à-dire que l'état d'arrêt entrera dans cet état après. nettoyant tous les threads de travail, et en même temps, dans l'état d'arrêt, il entrera directement dans cette étape une fois que la file d'attente sera vide et que le thread de travail aura été nettoyé. Dans cette étape, la méthode terminated() sera. exécuté de manière cyclique.

L'état TERMINATED est le dernier état sur la base du précédent, la méthode terminated() a également été exécutée, puis elle entrera dans cet état à partir de l'état précédent, ce qui signifie. que le pool de threads s'est complètement arrêté.

Étant donné que l'état du pool de threads est enregistré via AtomicInteger, l'état actuel du thread peut être facilement obtenu par comparaison.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

Vient ensuite plusieurs variables liées aux threads de travail dans le pool de threads

corePoolSize représente le nombre minimum de threads de travail autorisés à survivre dans le pool de threads, mais il convient de noter que si allowCoreThreadTimeOut est défini sur true une fois (par défaut false), le temps de survie de chaque thread est uniquement keepAliveTime. C'est-à-dire que lorsque allowCoreThreadTimeOut est vrai, le nombre minimum de threads de travail dans le pool de threads est de 0 maximumPoolSize représente le nombre maximum de threads de travail ; threads dans le pool de threads. 🎜🎜🎜🎜keepAliveTime est le temps d'attente le plus long pour chaque thread de travail en attente de travail lorsque le nombre de threads de travail dans le pool de threads est supérieur à corePoolSize. 🎜🎜🎜🎜workQueue sert de file d'attente de tâches du pool de threads. Cela sera expliqué en détail dans execute() ensuite. 🎜🎜🎜🎜Workers sert de conteneur pour stocker les threads de travail dans le pool de threads. 🎜🎜🎜🎜largestPoolSize est utilisé pour enregistrer le plus grand nombre de threads de travail ayant jamais existé dans le pool de threads. 🎜🎜🎜🎜completedTaskCount est utilisé pour enregistrer le nombre total de tâches terminées par le pool de threads. 🎜🎜🎜🎜Handler est une stratégie de rejet lorsque le pool de threads ne peut pas accepter de tâches. Nous pouvons implémenter notre propre stratégie de rejet sur la base de l'implémentation de l'interface RejectedExecutionHandler. Ce qui suit est la politique de rejet par défaut du pool de threads, 🎜🎜🎜
public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}
🎜threadFactory est utilisée comme classe d'usine pour les threads de production du pool de threads🎜🎜Ce qui suit est la méthode de thread de production d'usine de threads par défaut du pool de threads🎜
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
🎜Nous pouvons d'abord regarder dans notre méthode d'exécution () la plus communément appelée🎜
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
🎜execute(), la logique d'appel interne est très claire. 🎜🎜Si le nombre de threads de travail dans le pool de threads actuel est inférieur à corePoolSize, appelez directement addWoker() pour ajouter des threads de travail. 🎜🎜Ce qui suit est la méthode spécifique de addWorker()🎜
final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
🎜Cette méthode est relativement longue, mais la logique globale est toujours claire. 🎜🎜Déterminez d'abord l'état du pool de threads actuel. Si l'état n'est pas arrêté ou en cours d'exécution, ou s'il est arrêté mais que la file d'attente de travail est vide, alors il reviendra directement à l'échec d'ajout de travail à ce moment. L'étape suivante consiste à juger le nombre de threads dans le pool de threads. En fonction de la valeur principale au moment de l'appel, il est déterminé si elle est basée sur corePoolSize ou maximumPoolSize. 🎜🎜Après avoir confirmé l'état du pool de threads et le nombre de threads de travail dans le pool de threads, vous pouvez réellement commencer à ajouter des threads de travail. 🎜🎜Créez une nouvelle classe de travail (une classe interne du pool de threads, un thread de travail spécifique), transmettez le thread spécifique à exécuter en paramètre dans la méthode de construction, puis ajoutez-le au conteneur de threads de travail travailleurs du thread pool et mettez-le à jour Le nombre maximum de threads de travail, et enfin l'appel de la méthode start() du thread de travail, termine la création et le démarrage du thread de travail. 🎜🎜 Revenons à la méthode execute(). Si le nombre de threads que nous avons au début est supérieur à corePoolSize, ou s'il y a un problème lorsque nous appelons la méthode addworker() et que le nombre de threads de travail ne parvient pas à être ajouté. , nous continuerons ensuite à exécuter la logique descendante suivante. 🎜

在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。

Worker类作为线程池的内部类

接下来是Worker()类的成员

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

  • firstTask则是worker在构造方法中所接受到的所要执行的任务。

  • completedTasks作为该worker类所执行完毕的任务总数。

接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了

public void run() {
    runWorker(this);
}

run()方法实现的很简单

我们可以继续追踪下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。

下面是getTask()的实现

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

在runWorker()方法的最后

调用了processWorkerExist()方法来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer