ホームページ  >  記事  >  Java  >  Javaスレッドプールのexecute()メソッドの使用方法

Javaスレッドプールのexecute()メソッドの使用方法

WBOY
WBOY転載
2023-05-10 23:46:122020ブラウズ

まずスレッド プールの動作を理解してください

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

スレッド プールは 2 つの異なる問題を処理します。スレッド プールは、スレッドが正式に呼び出される前のオーバーヘッドを削減することで、多数の非同期タスクのパフォーマンスを向上させます。同時に、管理タスク スレッドをバインドするための一連の方法が提供されます。各スレッド プールには、内部で完了したタスクの数など、いくつかの基本情報が含まれています。

まず、ThreadPoolExecutor クラスの一連のステータス表現を見てみましょう。AtomicInteger クラスの

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl は、クラスに 2 種類の情報を格納し、上位 3 ビットがステータスを保存します。このとき、スレッド プール内の Wker クラス スレッドの数を保存するために 29 ビットが使用されます (スレッド プール内の最大許容スレッド数は約 5 億であることがわかります)。提供されている runStateOf() メソッドと workerCountOf() メソッドはそれぞれ、スレッドのステータスとスレッドの数を表示するメソッドを提供していることがわかります。

このクラスには合計 5 つの状態があります

作成者によるコメントを見てみましょう

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don&#39;t accept new tasks, but process queued tasks
*   STOP:     Don&#39;t accept new tasks, don&#39;t process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • RUNNING状態 新しい受信タスクを受け入れ、キュー内のタスクを実行することもできます。

  • SHUTDOWN ステータスは新しいタスクを受け入れなくなりますが、キュー内のタスクは引き続き処理されます。

  • STOP 前のステータスに基づいて、キュー内の文字は処理されず、実行中のタスクは直接中断されます。

  • TIDYINGステータスは前のステータスに基づいており、すべてのタスクが終了し、プール内のワーカー スレッドはすべて 0 になっています。停止ステータスがすべてクリアされた後、ワーカー スレッドはこの状態に入ります。同時に、シャットダウン状態では、キューが空になり、ワーカー スレッドがクリーンアップされた後、直接このステージに入ります。このステージでは、 terminated() メソッドは周期的に実行されます。

  • TERMINATED この状態は最後の状態です。前の状態に基づいて、terminated() メソッドも実行されており、次の状態からこの状態に入ります。以前の状態。スレッド プールが完全に停止したことを意味します。

スレッドプールの状態はAtomicIntegerで保存されるため、比較するだけで現在のスレッド状態を取得できます。

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

次に、スレッド プール内のワーカー スレッドに関連するいくつかの変数を示します。

  • corePoolSizeコア内で存続できるワーカー スレッドの最小数を示します。ただし、allowCoreThreadTimeOut が true (デフォルトは false) に設定されている場合、各スレッドの生存時間は keepAliveTime のみであることに注意してください。つまり、allowCoreThreadTimeOut が true の場合、スレッド プール内の作業スレッドの最小数は決まります。は 0、maximumPoolSize はスレッド プールのワーカー スレッドの最大数を表します。

  • keepAliveTime は、スレッド プール内のワーカー スレッドの数が corePoolSize よりも大きいときに、作業を待機するときの各ワーカー スレッドの最長待機時間です。

  • workQueueスレッドプールのタスク待ちキューとして、次回のexecute()で詳しく説明します。

  • #Workersスレッド プールにワーカー スレッドを格納するためのコンテナとして。

  • largestPoolSize は、スレッド プール内にこれまでに存在したワーカー スレッドの最大数を記録するために使用されます。

  • completedTaskCount は、スレッド プールによって完了されたタスクの総数を記録するために使用されます。

  • Handlerタスクを受け付けられない場合のスレッドプールでの拒否戦略として、RejectedExecutionHandler インターフェースの実装を前提とした独自の拒否戦略を実装できます。以下は、スレッド プールのデフォルトの拒否ポリシーです。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

threadFactory は、スレッド プールの実稼働スレッドのファクトリ クラスとして使用されます

次は、スレッド プールのデフォルトです。スレッド ファクトリのプロダクション スレッド メソッド

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

最初に、最も頻繁に呼び出すexecute()メソッドを見てみましょう

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()の内部呼び出しロジックは非常に明確です。

現在のスレッド プール内のワーカー スレッドの数が corePoolSize 未満の場合は、addWker() を直接呼び出してワーカー スレッドを追加します。

以下は、addWorker() の具体的なメソッドです。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

このメソッドは比較的長いですが、全体的なロジックはまだ明確です。

まず、現在のスレッド プールのステータスを確認します。ステータスがシャットダウンまたは実行中でない場合、またはシャットダウンであってもワーク キューが空の場合は、この時点で作業の追加が失敗した状態に直接戻ります。 。次にスレッドプールのスレッド数の判定ですが、呼び出し時のコア値をもとにcorePoolSizeかmaximumPoolSizeで判定します。

スレッド プールのステータスとスレッド プール内のワーカー スレッドの数を確認したら、実際にワーカー スレッドの追加を開始できます。

新しいワーカー クラス (スレッド プールの内部クラス、特定の作業スレッド) を作成し、実行する特定のスレッドを構築メソッドのパラメーターとして渡し、スレッドの作業スレッド コンテナーに追加します。ワーカーをプールし、ワーカー スレッドの最大数を更新し、最後にワーカー スレッドの start() メソッドを呼び出して、ワーカー スレッドの作成と起動を完了します。

execute() メソッドに戻りましょう。最初に持っているスレッドの数が corePoolSize より大きい場合、または addworker() メソッドの呼び出し時に問題が発生し、ワーカー スレッドの数が変化する場合追加に失敗した場合は、次のロジックが引き続き実行されます。

在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。

Worker类作为线程池的内部类

接下来是Worker()类的成员

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

  • firstTask则是worker在构造方法中所接受到的所要执行的任务。

  • completedTasks作为该worker类所执行完毕的任务总数。

接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了

public void run() {
    runWorker(this);
}

run()方法实现的很简单

我们可以继续追踪下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。

下面是getTask()的实现

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

在runWorker()方法的最后

调用了processWorkerExist()方法来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

以上がJavaスレッドプールのexecute()メソッドの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。