Heim  >  Artikel  >  System-Tutorial  >  So funktioniert der Java-Thread-Pool

So funktioniert der Java-Thread-Pool

WBOY
WBOYnach vorne
2024-01-12 08:18:05977Durchsuche
Einführung Das Konzept des „Pools“ ist in unserer Entwicklung keine Seltenheit. Es gibt Datenbankverbindungspools, Thread-Pools, Objektpools, Konstantenpools usw. Im Folgenden konzentrieren wir uns hauptsächlich auf den Thread-Pool, um Schritt für Schritt den Schleier des Thread-Pools aufzudecken.
Vorteile der Verwendung eines Thread-Pools

1. Reduzieren Sie den Ressourcenverbrauch

Sie können erstellte Threads wiederverwenden, um den Verbrauch durch Thread-Erstellung und -Zerstörung zu reduzieren.

2. Reaktionsgeschwindigkeit verbessern

Wenn eine Aufgabe eintrifft, kann die Aufgabe sofort ausgeführt werden, ohne auf die Erstellung des Threads warten zu müssen.

3. Verbessern Sie die Thread-Verwaltbarkeit

Threads sind knappe Ressourcen. Wenn sie ohne Einschränkungen erstellt werden, verbrauchen sie nicht nur Systemressourcen, sondern verringern auch die Stabilität des Systems. Verwenden Sie den Thread-Pool für eine einheitliche Zuordnung, Optimierung und Überwachung

So funktioniert der Thread-Pool Werfen wir zunächst einen Blick darauf, wie der Thread-Pool eine neue Aufgabe verarbeitet, nachdem sie an den Thread-Pool übermittelt wurde

1. Der Thread-Pool bestimmt, ob alle Threads im Kern-Thread-Pool Aufgaben ausführen. Wenn nicht, wird ein neuer Arbeitsthread erstellt, um die Aufgabe auszuführen. Wenn alle Threads im Kern-Thread-Pool Aufgaben ausführen, führen Sie den zweiten Schritt aus.

2. Der Thread-Pool bestimmt, ob die Arbeitswarteschlange voll ist. Wenn die Arbeitswarteschlange nicht voll ist, werden neu eingereichte Aufgaben in dieser Arbeitswarteschlange gespeichert und gewartet. Wenn die Arbeitswarteschlange voll ist, fahren Sie mit Schritt 3 fort

3. Der Thread-Pool bestimmt, ob sich alle Threads im Thread-Pool im Arbeitsstatus befinden. Wenn nicht, wird ein neuer Arbeitsthread erstellt, um die Aufgabe auszuführen. Wenn es voll ist, übergeben Sie es der Sättigungsstrategie, um diese Aufgabe zu erledigen

Thread-Pool-Sättigungsstrategie Die Sättigungsstrategie des Thread-Pools wird hier erwähnt. Lassen Sie uns die Sättigungsstrategien kurz vorstellen:

AbortPolicy

Dies ist die Standardblockierungsstrategie des Java-Thread-Pools. Sie führt diese Aufgabe nicht aus und löst direkt eine Laufzeitausnahme aus. Denken Sie daran, dass ThreadPoolExecutor.execute einen Try-Catch erfordert, da das Programm sonst direkt beendet wird.

DiscardPolicy

Wenn Sie es direkt abbrechen, wird die Aufgabe nicht ausgeführt und die Methode ist leer

Älteste Richtlinie verwerfen

Verwerfen Sie eine Aufgabe von head aus der Warteschlange und führen Sie diese Aufgabe erneut aus.

CallerRunsPolicy

Das Ausführen dieses Befehls in dem Thread, der Execute aufruft, blockiert den Eingang

Benutzerdefinierte Ablehnungsrichtlinie (am häufigsten verwendet)

Implementieren Sie RejectedExecutionHandler und definieren Sie das Strategiemuster selbst

Nehmen wir ThreadPoolExecutor als Beispiel, um das Workflow-Diagramm des Thread-Pools zu zeigen

Java 线程池是如何工作的

Java 线程池是如何工作的

1. Wenn die Anzahl der aktuell ausgeführten Threads geringer ist als corePoolSize, erstellen Sie einen neuen Thread, um die Aufgabe auszuführen (beachten Sie, dass Sie eine globale Sperre erhalten müssen, um diesen Schritt auszuführen).

2. Wenn die laufenden Threads gleich oder größer als corePoolSize sind, fügen Sie die Aufgabe zu BlockingQueue hinzu.

3. Wenn die Aufgabe nicht zur BlockingQueue hinzugefügt werden kann (die Warteschlange ist voll), erstellen Sie einen neuen Thread in einem Nicht-CorePool, um die Aufgabe zu verarbeiten (beachten Sie, dass Sie eine globale Sperre erhalten müssen, um diesen Schritt auszuführen).

4. Wenn das Erstellen eines neuen Threads dazu führt, dass der aktuell laufende Thread die maximale Poolgröße überschreitet, wird die Aufgabe abgelehnt und die Methode RejectedExecutionHandler.rejectedExecution() aufgerufen.

Die allgemeine Entwurfsidee von ThreadPoolExecutor, die die oben genannten Schritte ausführt, besteht darin, den Erwerb globaler Sperren so weit wie möglich zu vermeiden, wenn die Methode „execute()“ ausgeführt wird (dies würde einen ernsthaften Engpass bei der Skalierbarkeit darstellen). Nachdem ThreadPoolExecutor die Aufwärmphase abgeschlossen hat (die Anzahl der aktuell ausgeführten Threads ist größer oder gleich corePoolSize), führen fast alle Aufrufe der Methode „execute()“ Schritt 2 aus, und für Schritt 2 ist keine globale Sperre erforderlich.

Quellcode-Analyse der Schlüsselmethode Werfen wir einen Blick auf den Quellcode der Kernmethode, die der Thread-Pool-Methode hinzugefügt wurde, und führen Sie sie wie folgt aus:

     //
     //Executes the given task sometime in the future.  The task
     //may execute in a new thread or in an existing pooled thread.
     //
     // If the task cannot be submitted for execution, either because this
     // executor has been shutdown or because its capacity has been reached,
     // the task is handled by the current {@code RejectedExecutionHandler}.
     //
     // @param command the task to execute
     // @throws RejectedExecutionException at discretion of
     //         {@code RejectedExecutionHandler}, if the task
     //         cannot be accepted for execution
     // @throws NullPointerException if {@code command} is null
     //
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //
         // Proceed in 3 steps:
         //
         // 1. If fewer than corePoolSize threads are running, try to
         // start a new thread with the given command as its first
         // task.  The call to addWorker atomically checks runState and
         // workerCount, and so prevents false alarms that would add
         // threads when it shouldn't, by returning false.
         // 翻译如下:
         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,
         // 如果能完成新线程创建exexute方法结束,成功提交任务
         // 2. If a task can be successfully queued, then we still need
         // to double-check whether we should have added a thread
         // (because existing ones died since last checking) or that
         // the pool shut down since entry into this method. So we
         // recheck state and if necessary roll back the enqueuing if
         // stopped, or start a new thread if there are none.
         // 翻译如下:
         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态
         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要
         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
         // 3. If we cannot queue task, then we try to add a new
         // thread.  If it fails, we know we are shut down or saturated
         // and so reject the task.
         // 翻译如下:
         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池
         // 已经达到饱和状态,所以reject这个他任务
         //
        int c = ctl.get();
        // 工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果工作线程数大于等于核心线程数
        // 线程的的状态未RUNNING并且队列notfull
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 移除成功,拒绝该非运行的任务
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
                addWorker(null, false);
        }
        // 如果队列满了或者是非运行的任务都拒绝执行
        else if (!addWorker(command, false))
            reject(command);
    }

Sehen wir uns weiterhin an, wie addWorker implementiert wird:

  private boolean addWorker(Runnable firstTask, boolean core) {
        // java标签
        retry:
        // 死循环
        for (;;) {
            int c = ctl.get();
            // 获取当前线程状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 这个逻辑判断有点绕可以改成 
            // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
            // 逻辑判断成立可以分为以下几种情况均不接受新任务
            // 1、rs > shutdown:--不接受新任务
            // 2、rs >= shutdown && firstTask != null:--不接受新任务
            // 3、rs >= shutdown && workQueue.isEmppty:--不接受新任务
            // 逻辑判断不成立
            // 1、rs==shutdown&&firstTask != null:此时不接受新任务,但是仍会执行队列中的任务
            // 2、rs==shotdown&&firstTask == null:会执行addWork(null,false)
            //  防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
            //  添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                return false;
            // 死循环
            // 如果线程池状态为RUNNING并且队列中还有需要执行的任务
            for (;;) {
                // 获取线程池中线程数量
                int wc = workerCountOf(c);
                // 如果超出容量或者最大线程池容量不在接受新任务
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程安全增加工作线程数
                if (compareAndIncrementWorkerCount(c))
                    // 跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态发生变化,重新循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 走到这里说明工作线程数增加成功
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 检查线程状态
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将新启动的线程添加到线程池中
                        workers.add(w);
                        // 更新线程池线程数且不超过最大值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
                if (workerAdded) {
                    //执行ThreadPoolExecutor的runWoker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败,则从wokers中移除w并递减wokerCount
            if (! workerStarted)
                // 递减wokerCount会触发tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }
AddWorker wird von runWorker gefolgt. Wenn es zum ersten Mal gestartet wird, führt es die während der Initialisierung übergebene Aufgabe aus. Wenn die Warteschlange leer ist, wird darauf gewartet solange keepAliveTime

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
Sehen wir uns an, wie getTask ausgeführt wird

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 死循环
        retry: for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
            // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
            // 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 递减workerCount值
                decrementWorkerCount();
                return null;
            }
            // 标记从队列中取任务时是否设置超时时间
            boolean timed; // Are workers subject to culling?
            // 1.RUNING状态
            // 2.SHUTDOWN状态,但队列中还有任务需要执行
            for (;;) {
                int wc = workerCountOf(c);
                // 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
                // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
                // poll出异常了重试
                // 2.timeOut == true && timed ==
                // false:看后面的代码workerQueue.poll超时时timeOut才为true,
                // 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
                // 所以超时不会继续执行而是return null结束线程。
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;
                // workerCount递减,结束当前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
            try {
                // 1.以指定的超时时间从队列中取任务
                // 2.core thread没有超时
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超时
            } catch (InterruptedException retry) {
                timedOut = false;// 线程被中断重试
            }
        }
    }

Sehen wir uns an, wie ProcessWorkerExit funktioniert

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的话再runWorker的getTask方法workerCount已经被减一了
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加线程的completedTasks
            completedTaskCount += w.completedTasks;
            // 从线程池中移除超时或者出现异常的线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试停止线程池
        tryTerminate();
        int c = ctl.get();
        // runState为RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.线程异常退出
            // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
            addWorker(null, false);
        }
    }
tryTerminate

Die Methode „processWorkerExit“ versucht, tryTerminate aufzurufen, um den Thread-Pool zu beenden. Diese Methode wird nach jeder Aktion ausgeführt, die zur Beendigung des Thread-Pools führen kann: z. B. dem Reduzieren von wokerCount oder dem Entfernen von Aufgaben aus der Warteschlange im Zustand SHUTDOWN.

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下状态直接返回:
            // 1.线程池还处于RUNNING状态
            // 2.SHUTDOWN状态但是任务队列非空
            // 3.runState >= TIDYING 线程池已经停止了或在停止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 只能是以下情形会继续下面的逻辑:结束线程池。
            // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
            // 2.STOP状态,当调用了shutdownNow方法
            // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
            // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
                // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 进入TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子类重载:一些资源清理工作
                        terminated();
                    } finally {
                        // TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 继续awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
Die Shutdown-Methode setzt runState auf SHUTDOWN und beendet alle inaktiven Threads. Die Methode „shutdownNow“ setzt runState auf STOP. Der Unterschied zur Shutdown-Methode besteht darin, dass diese Methode alle Threads beendet. Der Hauptunterschied besteht darin, dass Shutdown die Methode interruptIdleWorkers aufruft, während ShutdownNow tatsächlich die Methode interruptIfStarted der Worker-Klasse aufruft:

Ihre Umsetzung ist wie folgt:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
            // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
            // tryTerminate方法中会保证队列中剩余的任务得到执行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP状态:不再接受新任务且不再执行队列中的任务。
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 返回队列中还没有被执行的任务。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
            // 因此保证了中断的肯定是空闲的线程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化时state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
Nutzung des Thread-Pools Erstellung eines Thread-Pools Wir können über ThreadPoolExecutor einen Thread-Pool erstellen

    /**
     * @param corePoolSize 线程池基本大小,核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,
     * 队列满了才创建新的线程。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,
     * 等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,
     * 线程池会提前创建并启动所有基本线程。
     * @param maximumPoolSize 最大线程数,超过就reject;线程池允许创建的最大线程数。如果队列满了,
     * 并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
     * @param keepAliveTime
     * 线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
     * @param unit  线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、
     * 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
     * @param workQueue 工作队列,线程池中的工作线程都是从这个工作队列源源不断的获取任务进行执行
     */
    public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue) {
        // threadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }
向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。

threadsPool.execute(new Runnable() {
        @Override
        public void run() {
        }
    });

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
  try
    {
        Object s = future.get();
    }catch(
    InterruptedException e)
    {
        // 处理中断异常
    }catch(
    ExecutionException e)
    {
        // 处理无法执行任务异常
    }finally
    {
        // 关闭线程池
        executor.shutdown();
    }
关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理的配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

1、任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

2、任务的优先级:高、中和低。

3、任务的执行时间:长、中和短。

4、任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行

如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

Es wird empfohlen, eine begrenzte Warteschlange zu verwenden. Begrenzte Warteschlangen können die Stabilität und Frühwarnfähigkeit des Systems erhöhen und können bei Bedarf größer eingestellt werden, beispielsweise auf mehrere Tausend. Manchmal sind die Warteschlange und der Thread-Pool des Hintergrundaufgaben-Thread-Pools in unserem System voll und es werden ständig Ausnahmen von abgebrochenen Aufgaben ausgelöst. Bei der Untersuchung wurde festgestellt, dass ein Problem mit der Datenbank vorliegt, was dazu führt, dass die Ausführung von SQL sehr schwierig wird langsam, da der Thread-Pool der Hintergrundaufgabe alle Aufgaben erfordert, die Daten abfragen und in die Datenbank einfügen, sodass alle Arbeitsthreads im Thread-Pool blockiert sind und Aufgaben im Thread-Pool zurückgeblieben sind. Wenn wir es zu diesem Zeitpunkt auf eine unbegrenzte Warteschlange festlegen, gibt es immer mehr Warteschlangen im Thread-Pool, die möglicherweise den Speicher füllen und dazu führen, dass das gesamte System und nicht nur Hintergrundaufgaben nicht verfügbar sind. Natürlich werden alle Aufgaben in unserem System auf separaten Servern bereitgestellt und wir verwenden Thread-Pools unterschiedlicher Größe, um verschiedene Arten von Aufgaben auszuführen. Wenn jedoch solche Probleme auftreten, sind auch andere Aufgaben betroffen.

Thread-Pool-Überwachung

Wenn der Thread-Pool im System häufig genutzt wird, ist es notwendig, den Thread-Pool zu überwachen, damit bei Auftreten eines Problems das Problem anhand der Nutzung des Thread-Pools schnell lokalisiert werden kann. Es kann über die vom Thread-Pool bereitgestellten Parameter überwacht werden. Bei der Überwachung des Thread-Pools können Sie die folgenden Attribute verwenden

    taskCount: Die Anzahl der Aufgaben, die der Thread-Pool ausführen muss.
  • completedTaskCount: Die Anzahl der Aufgaben, die der Thread-Pool während des Betriebs abgeschlossen hat, kleiner oder gleich taskCount.
  • largestPoolSize: Die größte Anzahl von Threads, die jemals im Thread-Pool erstellt wurden. Anhand dieser Daten können Sie erkennen, ob der Thread-Pool jemals voll war. Wenn dieser Wert der maximalen Größe des Thread-Pools entspricht, bedeutet dies, dass der Thread-Pool voll ist.
  • getPoolSize: Die Anzahl der Threads im Thread-Pool. Wenn der Thread-Pool nicht zerstört wird, werden die Threads im Thread-Pool nicht automatisch zerstört, sodass die Größe nur zunimmt, aber nicht abnimmt.
  • getActiveCount: Ermittelt die Anzahl der aktiven Threads.
Überwachung durch Erweiterung des Thread-Pools. Sie können den Thread-Pool anpassen, indem Sie den Thread-Pool erben, die Methoden „beforeExecute“, „afterExecute“ und „terminated“ des Thread-Pools neu schreiben, oder Sie können Code zur Überwachung vor, nach und vor dem Schließen des Thread-Pools ausführen. Überwachen Sie beispielsweise die durchschnittliche Ausführungszeit, die maximale Ausführungszeit und die minimale Ausführungszeit von Aufgaben.

Das obige ist der detaillierte Inhalt vonSo funktioniert der Java-Thread-Pool. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:linuxprobe.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen