Rumah  >  Artikel  >  Java  >  Cara menggunakan kaedah Java thread pool execute().

Cara menggunakan kaedah Java thread pool execute().

WBOY
WBOYke hadapan
2023-05-10 23:46:122020semak imbas

Mula-mula fahami apakah peranan kumpulan benang

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

Kolam benang mengendalikan dua masalah berbeza Kolam benang memberikan prestasi yang lebih baik kepada sejumlah besar tugas tak segerak dengan mengurangkan overhed sebelum benang secara rasmi. dipanggil. Pada masa yang sama, satu siri kaedah untuk mengikat utas tugas pengurusan diberikan. Setiap kumpulan benang mengandungi beberapa maklumat asas, seperti bilangan tugasan yang diselesaikan secara dalaman.

Pertama lihat siri

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl yang mewakili negeri bagi kelas ThreadPoolExecutor Sebagai kelas AtomicInteger, ia menyimpan dua jenis maklumat dalam kelas, di mana 3 bit tinggi disimpan. keadaan kumpulan benang. 29 bit digunakan untuk menyimpan bilangan benang kelas Woker dalam kumpulan benang pada masa ini (boleh dilihat bahawa bilangan benang maksimum yang boleh diterima dalam kumpulan benang ialah kira-kira 500 juta). Dapat dilihat bahawa kaedah runStateOf() dan workerCountOf() yang disediakan masing-masing menyediakan kaedah untuk melihat status thread dan bilangan thread.

Kelas ini memberikan jumlah lima keadaan

Mari kita lihat komen yang diberikan oleh pengarang

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don&#39;t accept new tasks, but process queued tasks
*   STOP:     Don&#39;t accept new tasks, don&#39;t process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • RUNNINGStatus boleh menerima yang baharu incomings Tugas dalam baris gilir juga akan dilaksanakan pada masa yang sama. Status

  • SHUTDOWN tidak lagi akan menerima tugasan baharu, tetapi tugasan dalam baris gilir masih akan diproses. Status

  • STOP adalah berdasarkan status sebelumnya, dan aksara dalam baris gilir tidak akan diproses dan tugasan yang dilaksanakan akan terganggu secara langsung.

  • TIDYINGStatus adalah berdasarkan yang sebelumnya Semua tugas telah ditamatkan dan rangkaian Pekerja dalam kumpulan semuanya telah 0. Iaitu, status hentian telah dikosongkan. selepas semua benang pekerja telah dibersihkan Ia akan memasuki keadaan ini Pada masa yang sama, dalam keadaan penutupan, ia akan memasuki peringkat ini secara langsung selepas barisan kosong dan benang pekerja telah dibersihkan kaedah terminated() akan dilaksanakan secara kitaran. Keadaan

  • TERMINATED ialah keadaan terakhir Berdasarkan yang sebelumnya, kaedah terminated() juga telah dilaksanakan, dan kemudian ia akan memasuki keadaan ini dari yang sebelumnya. nyatakan, yang bermaksud bahawa kumpulan benang telah dihentikan sepenuhnya.

Memandangkan status kumpulan benang disimpan melalui AtomicInteger, status rangkaian semasa boleh diperolehi hanya dengan perbandingan.

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

Seterusnya ialah beberapa pembolehubah yang berkaitan dengan benang pekerja dalam kumpulan benang

  • corePoolSizeMenunjukkan bilangan minimum benang pekerja yang dibenarkan untuk bertahan dalam kumpulan benang, tetapi perlu diperhatikan Perkaranya ialah apabila allowCoreThreadTimeOut ditetapkan kepada benar (default false), masa kemandirian setiap utas hanyalah keepAliveTime Maksudnya, apabila allowCoreThreadTimeOut adalah benar, bilangan minimum utas pekerja dalam kumpulan benang ialah. 0; maximumPoolSize mewakili benang pekerja terbesar dalam kumpulan benang.

  • keepAliveTime ialah masa menunggu paling lama bagi setiap utas pekerja semasa menunggu kerja apabila bilangan utas pekerja dalam kumpulan benang lebih besar daripada corePoolSize.

  • workQueue berfungsi sebagai baris gilir menunggu tugasan bagi kumpulan benang Ini akan diterangkan secara terperinci dalam execute() seterusnya.

  • Workers berfungsi sebagai bekas untuk menyimpan benang pekerja dalam kolam benang.

  • largestPoolSize digunakan untuk merekodkan bilangan maksimum benang pekerja yang pernah wujud dalam kumpulan benang.

  • completedTaskCount digunakan untuk merekodkan jumlah tugasan yang diselesaikan oleh kumpulan benang.

  • HandlerSebagai strategi penolakan dalam kumpulan benang apabila tugasan tidak boleh diterima, kami boleh melaksanakan strategi penolakan kami sendiri atas premis melaksanakan antara muka RejectedExecutionHandler. Berikut ialah dasar penolakan lalai kumpulan benang,

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

threadFactory ialah kelas kilang untuk benang pengeluaran kumpulan benang

Berikut ialah kilang benang lalai kumpulan benang kaedah benang pengeluaran

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

Kita boleh lihat dahulu kaedah execute() kami yang paling biasa dipanggil

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Logik panggilan dalaman execute() adalah sangat jelas.

Jika bilangan utas pekerja dalam kumpulan utas semasa adalah kurang daripada corePoolSize, hubungi addWoker() terus untuk menambah utas pekerja.

Berikut ialah kaedah khusus addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Kaedah ini agak panjang, tetapi logik keseluruhannya masih jelas.

Mula-mula tentukan status kumpulan benang semasa Jika status tidak ditutup atau berjalan, atau ia ditutup tetapi baris gilir kerja kosong, maka ia akan kembali kepada kegagalan untuk menambah kerja pada masa ini. . Langkah seterusnya ialah menilai bilangan utas kumpulan benang Berdasarkan nilai teras pada masa panggilan, sama ada ia dinilai oleh corePoolSize atau maximumPoolSize.

Selepas mengesahkan status kumpulan benang dan bilangan benang pekerja dalam kumpulan benang, anda sebenarnya boleh mula menambah benang pekerja.

Buat kelas pekerja baharu (kelas dalaman kumpulan benang, benang kerja tertentu), luluskan benang khusus untuk dilaksanakan sebagai parameter dalam kaedah pembinaan, dan kemudian tambahkannya pada bekas benang kerja benang kumpulan pekerja, dan mengemas kini bilangan maksimum rangkaian pekerja, dan akhirnya memanggil kaedah mula() urutan pekerja untuk menyelesaikan penciptaan dan permulaan urutan pekerja.

Mari kita kembali ke kaedah execute() Jika bilangan utas yang kita ada pada permulaan adalah lebih besar daripada corePoolSize, atau terdapat masalah apabila kita memanggil kaedah addworker() dan bilangan utas pekerja. gagal ditambah, maka kita Logik seterusnya akan terus dilaksanakan.

在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。

Worker类作为线程池的内部类

接下来是Worker()类的成员

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

  • firstTask则是worker在构造方法中所接受到的所要执行的任务。

  • completedTasks作为该worker类所执行完毕的任务总数。

接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了

public void run() {
    runWorker(this);
}

run()方法实现的很简单

我们可以继续追踪下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。

下面是getTask()的实现

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

在runWorker()方法的最后

调用了processWorkerExist()方法来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

Atas ialah kandungan terperinci Cara menggunakan kaedah Java thread pool execute().. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam