스레드 풀 생성은 이전 기사에서 분석되었습니다. 스레드 풀에는 유연한 사용자 정의를 지원하기 위해 사전 설정된 템플릿과 다양한 매개 변수가 모두 있는 것으로 이해됩니다.
이 글에서는 스레드 풀의 라이프사이클에 초점을 맞춰 스레드 풀이 작업을 실행하는 과정을 분석해 보겠습니다.
스레드 풀 상태
먼저 스레드 풀 코드 전체에서 두 매개변수를 이해하세요.
runState: 스레드 풀 실행 상태
WorkerCount: 작업자 스레드 수
스레드 풀은 32비트 int를 사용하여 runState와 WorkerCount를 동시에 저장하는데, 그 중 상위 3비트가 runState이고 나머지 29비트가 WorkerCount입니다. RunStateOf 및 WorkerCountOf는 runState 및 WorkerCount를 얻기 위해 코드에서 반복적으로 사용됩니다.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
// ctl操作 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING: 새 작업 수신 가능, 대기 대기열에 있는 작업 실행 가능
SHUTDOWN: 새 작업 수신 불가, 대기 대기열에 있는 작업 실행 가능
STOP: 새 작업을 받을 수 없고, 대기 중인 대기열에 있는 작업을 실행할 수 없으며, 실행 중인 모든 작업을 종료하려고 합니다.
TIDYING: 모든 작업이 종료되었습니다. 실행이 종료되었습니다()
TERMINATED : 종료됨() 실행 완료
스레드 풀 상태는 기본적으로 RUNNING부터 시작하여 TERMINATED 상태로 종료됩니다. 중간에 각 상태를 거칠 필요는 없으나, 상태를 롤링할 수는 없습니다. 뒤쪽에. 상태 변경이 가능한 경로와 조건은 다음과 같습니다.
그림 1 스레드 풀 상태 변경 경로
작업자 생성
스레드 풀은 Worker 클래스에 의한 작업 실행을 담당합니다. Worker는 Java 동시성 프레임워크의 핵심인 AQS로 이어지는 AbstractQueuedSynchronizer를 상속합니다.
AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。
AQS에서는 Worker가 Thread를 래핑하고 작업을 수행한다는 점만 알면 됩니다.
execute를 호출하면 스레드 풀의 상황에 따라 Worker가 생성됩니다. 다음 네 가지 상황으로 요약할 수 있습니다.
그림 2 스레드의 Worker pool 네 가지 가능성
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //3 reject(command); else if (workerCountOf(recheck) == 0) //4 addWorker(null, false); } //5 else if (!addWorker(command, false)) //6 reject(command); }
마크 1은 첫 번째 상황에 해당합니다. addWorker가 코어에서 전달되고, core=true는 corePoolSize이고, core=false는 maximumPoolSize인지 확인해야 합니다. 허용된 최대값을 초과했습니다.
마크 2는 스레드 풀이 실행 중인지 확인하고 작업을 대기 대기열에 추가하는 두 번째 상황에 해당합니다. Mark 3은 스레드 풀 상태를 다시 확인합니다. 스레드 풀이 갑자기 실행되지 않는 상태가 되면 방금 대기 큐에 추가한 작업을 삭제하고 처리를 위해 RejectedExecutionHandler에 넘겨줍니다. Mark 4는 작업자가 없음을 확인하고 먼저 빈 작업이 있는 작업자를 추가합니다.
Mark 5는 세 번째 상황에 해당합니다. 대기 대기열에 더 이상 작업을 추가할 수 없습니다.
Mark 6은 네 번째 상황에 해당합니다. addWorker의 핵심이 false로 전달되고 반환 호출이 실패합니다. 즉, WorkerCount가 maximumPoolSize를 초과하여 처리를 위해 RejectedExecutionHandler로 넘겨졌습니다.
private boolean addWorker(Runnable firstTask, boolean core) { //1 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //2 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
1로 표시된 첫 번째 코드 조각은 작업자 개수에 1을 추가하는 간단한 목적을 가지고 있습니다. 코드를 작성하는 데 시간이 오래 걸리는 이유는 스레드 풀의 상태가 지속적으로 변경되고 동시성 환경에서는 변수의 동기화가 보장되어야 하기 때문입니다. 외부 루프는 스레드 풀 상태를 확인하고, 작업이 비어 있지 않으며, 대기열이 비어 있지 않은지 확인합니다. 내부 루프는 CAS 메커니즘을 사용하여 WorkerCount가 올바르게 증가하는지 확인합니다. CAS를 모르는 경우 CAS가 작업자 수의 후속 증가 또는 감소에 사용되는 비차단 동기화 메커니즘에 대해 알아볼 수 있습니다.
2로 표시된 두 번째 코드는 비교적 간단합니다. 새 Worker 개체를 만들고 Worker를 작업자(Set 컬렉션)에 추가합니다. 추가가 성공적으로 완료되면 작업자에서 스레드를 시작합니다. 마지막으로 스레드가 성공적으로 시작되었는지 판단합니다. 성공하지 못한 경우 addWorkerFailed가 직접 호출됩니다.
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
addWorkerFailed는 이미 증가한 WorkerCount를 줄이고 tryTerminate를 호출하여 스레드 풀을 종료합니다.
Worker 실행
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
Worker는 ThreadFactory를 이용해 생성자에서 Thread를 생성하고, run 메소드에서 runWorker를 호출하는데, 그 곳이 작업이 이루어지는 곳인 것 같습니다. 실제로 실행됨.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //1 while (task != null || (task = getTask()) != null) { w.lock(); //2 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //3 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; //4 w.completedTasks++; w.unlock(); } } completedAbruptly = false; //5 } finally { //6 processWorkerExit(w, completedAbruptly); } }
Mark 1은 루프에 들어가 null이 반환될 때까지 getTask에서 실행할 작업을 가져옵니다. 여기에서 스레드 재사용 효과가 달성되어 스레드가 여러 작업을 처리할 수 있습니다.
마크 2는 스레드 풀이 STOP 상태에서 중단되고 비 STOP 상태에서 중단되지 않음을 보장하는 비교적 복잡한 판단입니다. Java의 인터럽트 메커니즘을 이해하지 못한다면 Java 스레드를 올바르게 종료하는 방법에 대한 이 기사를 읽어보세요.
Mark 3에서는 run 메소드를 호출하여 실제로 작업을 실행합니다. beforeExecute와 afterExecute라는 두 가지 메서드가 실행 전후에 제공되며, 이는 하위 클래스에 의해 구현됩니다.
마크 4의 CompleteTasks는 작업자가 실행한 작업 수를 계산하고 최종적으로 CompletedTaskCount 변수에 누적되어 일부 통계 정보를 반환할 수 있습니다.
CompletedAbruptly 5로 표시된 변수는 작업자가 비정상적으로 종료되었는지 여부를 나타냅니다. 여기서 실행은 후속 메서드에 이 변수가 필요하다는 의미입니다.
Mark 6은 processWorkerExit를 종료하도록 호출하며 이는 나중에 분석됩니다.
다음으로 대기 대기열에서 작업을 가져오는 작업자의 getTask 메서드를 살펴보겠습니다.
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //1 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //2 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //3 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
标记1检查线程池的状态,这里就体现出SHUTDOWN和STOP的区别。如果线程池是SHUTDOWN状态,还会先处理完等待队列的任务;如果是STOP状态,就不再处理等待队列里的任务了。
标记2先看allowCoreThreadTimeOut这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepAliveTime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumPoolSize,出现这种可能是在运行中调用setMaximumPoolSize,还有wc>1,在等待队列非空时,至少保留一个worker。
标记3是从等待队列取任务的逻辑,根据timed分为等待keepAliveTime或者阻塞直到有任务。
最后来看结束worker需要执行的操作:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //2 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //3 tryTerminate(); int c = ctl.get(); //4 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
正常情况下,在getTask里就会将workerCount减一。标记1处用变量completedAbruptly判断worker是否异常退出,如果是,需要补充对workerCount的减一。
标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。
标记3尝试终止线程池,后续会研究。
标记4处理线程池还是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker。如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
总结一下worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。
线程池的关闭
线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。ThreadPoolExecutor提供两种方法关闭线程池:
shutdown:不能再提交任务,已经提交的任务可继续运行;
shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //1 安全策略机制 advanceRunState(SHUTDOWN); //2 interruptIdleWorkers(); //3 onShutdown(); //4 空方法,子类实现 } finally { mainLock.unlock(); } tryTerminate(); //5 }
shutdown将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); //1 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow和shutdown类似,将线程池切换为STOP状态,中断目标是所有worker。drainQueue会将等待队列里未执行的任务返回。
interruptIdleWorkers和interruptWorkers实现原理都是遍历workers集合,中断条件符合的worker。
上面的代码多次出现调用tryTerminate,这是一个尝试将线程池切换到TERMINATED状态的方法。
final void tryTerminate() { for (;;) { int c = ctl.get(); //1 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //2 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //3 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。
RUNNING(还在运行,不能停)
TIDYING或TERMINATED(已经没有在运行的worker)
SHUTDOWN并且等待队列非空(执行完才能停)
标记2在worker非空的情况下又调用了interruptIdleWorkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。
标记3是最终的状态切换。线程池会先进入TIDYING状态,再进入TERMINATED状态,中间提供了terminated这个空方法供子类实现。
调用关闭线程池方法后,需要等待线程池切换到TERMINATED状态。awaitTermination检查限定时间内线程池是否进入TERMINATED状态,代码如下:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
后言
以上就是Java 线程池执行原理分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

클래스 로더는 통합 클래스 파일 형식, 동적로드, 부모 위임 모델 및 플랫폼 독립적 인 바이트 코드를 통해 다른 플랫폼에서 Java 프로그램의 일관성과 호환성을 보장하고 플랫폼 독립성을 달성합니다.

Java 컴파일러가 생성 한 코드는 플랫폼 독립적이지만 궁극적으로 실행되는 코드는 플랫폼 별입니다. 1. Java 소스 코드는 플랫폼 독립적 인 바이트 코드로 컴파일됩니다. 2. JVM은 바이트 코드를 특정 플랫폼의 기계 코드로 변환하여 크로스 플랫폼 작동을 보장하지만 성능이 다를 수 있습니다.

멀티 스레딩은 프로그램 대응 성과 리소스 활용을 향상시키고 복잡한 동시 작업을 처리 할 수 있기 때문에 현대 프로그래밍에서 중요합니다. JVM은 스레드 매핑, 스케줄링 메커니즘 및 동기화 잠금 메커니즘을 통해 다양한 운영 체제에서 멀티 스레드의 일관성과 효율성을 보장합니다.

Java의 플랫폼 독립성은 작성된 코드가 수정없이 JVM이 설치된 모든 플랫폼에서 실행될 수 있음을 의미합니다. 1) Java 소스 코드는 바이트 코드로 컴파일됩니다. 2) 바이트 코드는 JVM에 의해 해석되고 실행됩니다.

javaapplicationscanindeedencounterplatform-specificissuesdespitetejvm'sabstraction.ressistinclude : 1) nativecodeandlibraries, 2) OperatingSystemDifferences, 3) jvmimplementationvariations, 및 4) 어려운 의존성, 개발자, 1)

클라우드 컴퓨팅은 Java의 플랫폼 독립성을 크게 향상시킵니다. 1) Java Code는 바이트 코드로 컴파일되어 다른 운영 체제에서 JVM에 의해 실행되어 크로스 플랫폼 작동을 보장합니다. 2) Docker 및 Kubernetes를 사용하여 Java 응용 프로그램을 배포하여 휴대 성 및 확장 성을 향상시킵니다.

Java'SplatformIndencealLowsDeveloperstowStowRiteCodeOntOnitOniNanyDeviceOroswithajvm. ThisIsachieAdthroughCompilingTobyTecode, thejvMIngretSorcompileStruntime.thistureatureDificallyNatlyBoostedjava'SADOPTIONDUOCROSS-PLAT-PLAT-PLAT-PLAT-PLAT-PLAT-PLAT-PLAT-PPLATION

Docker와 같은 컨테이너화 기술은 Java의 플랫폼 독립성을 대체하기보다는 향상됩니다. 1) 환경 간 일관성을 보장, 2) 특정 JVM 버전을 포함한 종속성 관리, 3) 배포 프로세스를 단순화하여 Java 응용 프로그램을보다 적응 가능하고 관리 할 수 있도록합니다.


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

안전한 시험 브라우저
안전한 시험 브라우저는 온라인 시험을 안전하게 치르기 위한 보안 브라우저 환경입니다. 이 소프트웨어는 모든 컴퓨터를 안전한 워크스테이션으로 바꿔줍니다. 이는 모든 유틸리티에 대한 액세스를 제어하고 학생들이 승인되지 않은 리소스를 사용하는 것을 방지합니다.

Atom Editor Mac 버전 다운로드
가장 인기 있는 오픈 소스 편집기

에디트플러스 중국어 크랙 버전
작은 크기, 구문 강조, 코드 프롬프트 기능을 지원하지 않음

SecList
SecLists는 최고의 보안 테스터의 동반자입니다. 보안 평가 시 자주 사용되는 다양한 유형의 목록을 한 곳에 모아 놓은 것입니다. SecLists는 보안 테스터에게 필요할 수 있는 모든 목록을 편리하게 제공하여 보안 테스트를 더욱 효율적이고 생산적으로 만드는 데 도움이 됩니다. 목록 유형에는 사용자 이름, 비밀번호, URL, 퍼징 페이로드, 민감한 데이터 패턴, 웹 셸 등이 포함됩니다. 테스터는 이 저장소를 새로운 테스트 시스템으로 간단히 가져올 수 있으며 필요한 모든 유형의 목록에 액세스할 수 있습니다.
