>Java >java지도 시간 >Java 스레드 풀 실행() 메소드를 사용하는 방법

Java 스레드 풀 실행() 메소드를 사용하는 방법

WBOY
WBOY앞으로
2023-05-10 23:46:122118검색

먼저 스레드 풀의 역할이 무엇인지 이해하세요

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

스레드 풀은 두 가지 다른 문제를 처리합니다. 스레드 풀은 동시에 스레드가 공식적으로 호출되기 전에 오버헤드를 줄여 많은 수의 비동기 작업에 더 나은 성능을 제공합니다. , 관리 작업 스레드를 바인딩하는 일련의 수단을 제공합니다. 각 스레드 풀에는 내부적으로 완료된 작업 수와 같은 몇 가지 기본 정보가 포함되어 있습니다.

상태를 나타내는 ThreadPoolExecutor 클래스의

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl 시리즈를 먼저 살펴보세요. AtomicInteger 클래스로 클래스에 두 가지 정보를 저장하는데, 그 중 상위 3비트는 스레드 풀의 상태를 저장합니다. , 마지막 29비트는 이때 스레드 풀을 저장한다. 스레드 풀에 있는 Woker 클래스 스레드 수(스레드 풀에서 허용되는 최대 스레드 수는 약 5억 개임을 알 수 있다). 각각 제공되는 runStateOf(), WorkerCountOf() 메소드는 스레드 상태와 스레드 개수를 조회할 수 있는 메소드를 제공하고 있음을 알 수 있다.

이 클래스는 총 5가지 상태를 제공합니다.

저자의 설명을 살펴보겠습니다

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don&#39;t accept new tasks, but process queued tasks
*   STOP:     Don&#39;t accept new tasks, don&#39;t process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • RUNNING 상태는 새로 들어오는 작업을 수락하고 대기열에 있는 작업을 실행할 수도 있습니다. RUNNING状态可以接受新进来的任务,同时也会执行队列里的任务。

  • SHUTDOWN 状态已经不会再接受新任务,但仍旧会处理队列中的任务。

  • STOP状态在之前的基础上,不会处理队列中的人物,在执行的任务也会直接被打断。

  • TIDYING状态在之前的基础上,所有任务都已经终止,池中的Worker线程都已经为0,也就是stop状态在清理完所有工作线程之后就会进入该状态,同时在shutdown状态在队列空以及工作线程清理完毕之后也会直接进入这个阶段,这一阶段会循环执行terminated()方法。

  • TERMINATED 状态作为最后的状态,在之前的基础上terminated()方法也业已执行完毕,才会从上个状态进入这个状态,代表线程池已经完全停止。

由于线程池的状态都是通过AtomicInteger来保存的,可以通过比较的方式简单的得到当前线程状态。

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

接下来是线程池的几个有关工作线程的变量

  • corePoolSize表示线程池中允许存活最少的工作线程数量,但值得注意的是如果allowCoreThreadTimeOut一旦设置true(默认false),每个线程的存活时间只有keepAliveTime也就是说在allowCoreThreadTimeOut为true的时候,该线程池最小的工作线程数量为0;maximumPoolSize代表线程池中最大的工作线程数量。

  • keepAliveTime为线程池中工作线程数量大于corePoolSize时,每个工作线程的在等待工作时最长的等待时间。

  • workQueue作为线程池的任务等待队列,这个将在接下来的execute()里详细解释。

  • Workers作为存放线程池中存放工作线程的容器。

  • largestPoolSize用来记录线程池中存在过的最大的工作线程数量。

  • completedTaskCount用来记录线程池完成的任务的总数。

  • Handler

SHUTDOWN 상태에서는 더 이상 새 작업을 허용하지 않지만 대기열에 있는 작업은 계속 처리됩니다.

STOP 상태는 대기열의 문자를 처리하지 않으며 실행 중인 작업이 직접 중단됩니다.

TIDYING 상태는 이전 상태를 기준으로 모든 작업이 종료되었으며 풀의 Worker 스레드가 모두 0이었습니다. 즉, 중지 상태는 이후에 이 상태로 들어갑니다. 모든 작업자 스레드는 정리된 상태이며, 동시에 종료 상태에서는 대기열이 비어 있고 작업자 스레드가 정리된 후 이 단계에 직접 들어갑니다. 주기적으로 실행됩니다.

TERMINATED 상태는 이전 상태를 기준으로 하여 종료() 메소드도 실행한 후 이전 상태에서 이 상태로 진입한다는 의미입니다. 스레드 풀이 완전히 중지되었습니다.

스레드 풀의 상태는 AtomicInteger를 통해 저장되므로 현재 스레드 상태를 비교하여 쉽게 알 수 있습니다.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

다음은 스레드 풀의 작업자 스레드와 관련된 여러 변수입니다.

corePoolSize는 스레드 풀에서 생존할 수 있는 최소 작업자 스레드 수를 나타내지만, AllowCoreThreadTimeOut이 허용되는 경우 주목할 가치가 있습니다. 한 번 true로 설정되면(기본값 false) 각 스레드의 생존 시간은 keepAliveTime입니다. 즉, allowedCoreThreadTimeOut이 true인 경우 스레드 풀의 최소 작업 스레드 수는 maximumPoolSize가 0입니다. 스레드 풀의 스레드. 🎜🎜🎜🎜keepAliveTime은 스레드 풀의 작업자 스레드 수가 corePoolSize보다 클 때 작업을 대기할 때 각 작업자 스레드의 가장 긴 대기 시간입니다. 🎜🎜🎜🎜workQueue는 스레드 풀의 작업 대기 대기열 역할을 합니다. 이에 대해서는 다음 Execute()에서 자세히 설명합니다. 🎜🎜🎜🎜Workers는 스레드 풀에 작업자 스레드를 저장하기 위한 컨테이너 역할을 합니다. 🎜🎜🎜🎜largestPoolSize는 스레드 풀에 존재했던 최대 작업자 스레드 수를 기록하는 데 사용됩니다. 🎜🎜🎜🎜completedTaskCount는 스레드 풀에서 완료된 총 작업 수를 기록하는 데 사용됩니다. 🎜🎜🎜🎜Handler는 스레드 풀이 작업을 수락할 수 없을 때의 거부 전략입니다. RejectedExecutionHandler 인터페이스 구현을 전제로 자체 거부 전략을 구현할 수 있습니다. 다음은 스레드 풀의 기본 거부 정책입니다. 🎜🎜🎜
public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}
🎜threadFactory는 스레드 풀 생성 스레드의 팩토리 클래스로 사용됩니다🎜🎜다음은 스레드 풀의 기본 스레드 팩토리의 스레드 생성 방법입니다🎜
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
🎜먼저 살펴 볼 수 있습니다 가장 일반적으로 호출되는 Execute() 메서드🎜
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
🎜execute()의 내부 호출 논리는 매우 명확합니다. 🎜🎜현재 스레드 풀의 작업자 스레드 수가 corePoolSize보다 적으면 addWoker()를 직접 호출하여 작업자 스레드를 추가하세요. 🎜🎜다음은 addWorker()의 구체적인 메소드입니다🎜
final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
🎜이 메소드는 비교적 길지만 전체적인 로직은 여전히 ​​명확합니다. 🎜🎜먼저 현재 스레드 풀의 상태를 확인합니다. 상태가 종료 또는 실행 중이 아니거나 종료이지만 작업 대기열이 비어 있으면 이 시점에서 작업 추가 실패로 바로 돌아갑니다. 다음 단계는 스레드 풀 스레드 수를 호출 당시의 코어 값을 기준으로 corePoolSize 또는 maximumPoolSize로 판단하는 것입니다. 🎜🎜스레드 풀 상태와 스레드 풀에 포함된 작업자 스레드 수를 확인한 후 실제로 작업자 스레드 추가를 시작할 수 있습니다. 🎜🎜새 작업자 클래스(스레드 풀의 내부 클래스, 특정 작업 스레드)를 생성하고 실행할 특정 스레드를 생성 메서드의 매개 변수로 전달한 후 스레드의 작업자 스레드 컨테이너 작업자에 추가합니다. 최대 작업자 스레드 수를 풀고 업데이트하고 마지막으로 작업자 스레드의 start() 메서드를 호출하면 작업자 스레드의 생성 및 시작이 완료됩니다. 🎜🎜처음에 있는 스레드 수가 corePoolSize보다 크거나 addworker() 메서드를 호출할 때 작업자 스레드 수가 추가되지 않는 경우 문제가 발생하는 경우 Execute() 메서드로 돌아가 보겠습니다. , 그런 다음 다음 다운 로직을 계속 실행합니다. 🎜

在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

在之前的过程中我们建立了工作线程Worker()类,那么我们现在看看worker类的内部实现,也可以说是线程池的核心部分。

Worker类作为线程池的内部类

接下来是Worker()类的成员

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

  • firstTask则是worker在构造方法中所接受到的所要执行的任务。

  • completedTasks作为该worker类所执行完毕的任务总数。

接下来我们可以看最重要的,也就是我们之前建立完Worker类之后立马调用的run()方法了

public void run() {
    runWorker(this);
}

run()方法实现的很简单

我们可以继续追踪下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。

下面是getTask()的实现

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOut和corePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

在runWorker()方法的最后

调用了processWorkerExist()方法来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

위 내용은 Java 스레드 풀 실행() 메소드를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제