본 글은 주로 Java의 ThreadPoolExecutor 원리 분석 관련 정보를 소개하고 있으며, 필요하신 분들은
Java의 ThreadPoolExecutor 원리 분석
을 참고하시기 바랍니다. 🎜> Thread Pool 소개
연결 처리 요청을 받기 위해 스레드 풀도 사용해야 합니다.
스레드 풀은
java.util.concurrent.ThreadPoolExecutor에 있습니다. 사용 시 일반적으로 submit, InvokeAll, shutdown 등의 공통 메소드를 제공하는 ExecutorService인터페이스를 사용한다.
newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor정적 메서드를 제공합니다. 🎜> 등, 이러한 메서드는 결국 ThreadPoolExecutor의 생성자를 호출합니다. 모든 매개변수를 포함하는 ThreadPoolExecutor의 생성자는
/** * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
스레드를 추가합니다. 스레드 수 = corePoolSize 및 corePoolSize인 경우 workQueue가 새 작업을 저장할 수 없는 경우에만 새 스레드가 생성됩니다. 초과 스레드는 유휴 keepAliveTime 후에 삭제됩니다.
ThreadPoolExecutor에 저장된 상태는
입니다.
RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED를 포함한 현재 스레드 풀 상태입니다.
현재 실행 중인 스레드의 유효 개수입니다.
이 두 상태를 int 변수에 넣으면 처음 3자리는 스레드 풀 상태, 마지막 29자리는 스레드 개수입니다.
예를 들어 0b11100000000000000000000000000001은 스레드인 RUNNING을 나타냅니다.
HashSet을 통해 작업자 세트를 저장합니다. HashSet에 액세스하기 전에 먼저 보호된 mainLock:ReentrantLock
제출, 실행
execute 실행 방법은 먼저 현재 워커 개수를 확인하는 것인데, corePoolSize보다 작다면 코어 워커를 추가해 보세요. 스레드 풀은 스레드 수 유지 및 상태 확인에 대해 많은 테스트를 수행합니다.
public void execute(Runnable command) { int c = ctl.get(); // 如果当期数量小于corePoolSize if (workerCountOf(c) < corePoolSize) { // 尝试增加worker if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池正在运行并且成功添加到工作队列中 if (isRunning(c) && workQueue.offer(command)) { // 再次检查状态,如果已经关闭则执行拒绝处理 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 如果工作线程都down了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }addWorker 메소드 구현
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 클래스는 동기 대기 기능을 얻기 위해 AbstractQueuedSynchronizer를 상속합니다.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
runWorker(Worker)는 Work Queue에서 지속적으로 작업을 가져와서 실행하는 Worker의 폴링 실행 로직입니다. 작업을 실행하는 동안 작업자가 중단되는 것을 방지하려면 각 작업을 실행하기 전에 작업자를 잠가야 합니다.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
ThreadPoolExecutor의 submit 메소드에서 Callable은 FutureTask로 패키징된 후 Execute 메소드로 넘겨집니다.
FutureTask
FutureTask는 Runnable과 Future에서 상속합니다.
COMPLETING,
NORMAL을 실행하면 정상 실행이 완료되고 결과를 얻습니다
EXCEPTIONAL, 실행
예외가 발생합니다
CANCELLED, 실행이 취소됩니다INTERRUPTING, 실행이 중단됩니다
INTERRUPTED, 중단되었습니다.
키 get 메소드
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
가 먼저 현재 상태를 가져옵니다. 실행이 완료되지 않고 정상이면 대기 결과 프로세스로 들어갑니다. 현재 상태를 얻기 위해 계속해서
루프를 돌립니다. 결과가 없으면 CAS를 통해 대기 목록의 선두에 자신을 추가합니다. 시간 초과가 설정된 경우 LockSupport.parkNanos는 지정된 시간에 추가됩니다.static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }FutureTask의 실행 메소드는 작업을 실행하고 결과의 위치를 설정하는 것입니다. 먼저 현재 상태가 NEW인지 확인하고 현재 스레드를 실행 스레드로 설정한 다음 Callable의 호출을 호출하여 가져옵니다. 결과를 설정하고 결과를 설정하여 FutureTask 상태를 수정합니다. 아아아아
위 내용은 Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!