Java Thread Pool
최근 프로젝트의 동시성 기능을 개선하고 있는데 개발이 순조롭지 않습니다. 많은 정보를 읽은 후 마침내 이해가 깊어졌습니다. 그래서 소스코드를 함께 확인하고 동시성 프로그래밍의 원리를 정리해보기로 했습니다.
가장 많이 사용되는 스레드 풀부터 시작하여 생성, 실행 및 종료를 중심으로 스레드 풀의 전체 수명 주기에 대한 구현 원리를 이해할 수 있도록 준비하세요. 나중에 원자 변수, 동시 컨테이너, 차단 대기열, 동기화 도구, 잠금 등과 같은 주제를 연구합니다. java.util.concurrent의 동시성 도구는 사용하기 어렵지 않지만 그냥 사용할 수는 없습니다. 빌어먹을 소스 코드를 읽어야 합니다. 하하. 참고로 제가 사용하는 JDK는 1.8입니다.
Executor 프레임워크
Executor는 스레드 풀 관리 프레임워크입니다. 인터페이스에는 Runnable 작업을 실행하는 메서드가 하나만 있습니다. ExecutorService 인터페이스는 Executor를 확장하고 스레드 수명주기 관리를 추가하며 작업 종료 및 작업 결과 반환과 같은 메서드를 제공합니다. AbstractExecutorService는 ExecutorService를 구현하고 submit 메소드와 같은 기본 구현 로직을 제공합니다.
그럼 오늘의 주제인 ThreadPoolExecutor는 AbstractExecutorService를 상속받아 스레드 풀의 구체적인 구현을 제공합니다.
생성자 메서드
다음은 최대 7개의 매개변수를 갖는 ThreadPoolExecutor의 가장 일반적인 생성자입니다. 구체적인 코드는 게시하지 않고 매개변수 확인 및 설정에 대한 몇 가지 설명만 게시하겠습니다.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize는 스레드 풀의 대상 크기로, 스레드 풀이 방금 생성되고 실행할 작업이 없을 때의 크기입니다. maximumPoolSize는 스레드 풀의 최대 상한입니다. keepAliveTime은 스레드의 생존 시간입니다. 스레드 풀의 스레드 수가 corePoolSize보다 크면 생존 시간을 초과하는 유휴 스레드가 재활용됩니다. 단위는 말할 필요도 없이 나머지 3개 매개변수는 나중에 분석하겠습니다.
기본 맞춤형 스레드 풀
ThreadPoolExecutor는 Executors의 팩토리 메서드로 생성된 일부 맞춤형 스레드 풀을 미리 설정합니다. newSingleThreadExecutor, newFixedThreadPool, newCachedThreadPool의 생성 매개변수를 분석해 보겠습니다.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool의 corePoolSize 및 maximumPoolSize는 모두 수신 고정 숫자로 설정되고 keepAliveTim은 0으로 설정됩니다. 스레드 풀이 생성된 후에는 스레드 수가 고정되므로 스레드 안정성이 필요한 상황에 적합합니다.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor는 스레드 수가 1로 고정된 newFixedThreadPool 버전으로, 풀에서 작업의 직렬화를 보장합니다. FinalizingDelegatedExecutorService가 반환된다는 점에 주목하세요.
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizedDelegatedExecutorService는 DelegatedExecutorService를 상속하고 gc 중에 스레드 풀을 닫는 작업만 추가합니다. DelegatedExecutorService의 소스 코드를 살펴보겠습니다. 🎜>
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }코드 매우 간단하게 DelegatedExecutorService는 ExecutorService의 메서드만 노출하도록 ExecutorService를 래핑하므로 스레드 풀의 매개 변수를 더 이상 구성할 수 없습니다. 원래 스레드 풀에 의해 생성된 매개변수는 조정이 가능하며, ThreadPoolExecutor는 set 메소드를 제공합니다. newSingleThreadExecutor를 사용하는 목적은 단일 스레드 직렬 스레드 풀을 생성하는 것입니다. 스레드 풀 크기도 구성할 수 있다면 지루할 것입니다. Executors는 일반 스레드 풀을 구성할 수 없는 스레드 풀로 래핑하는 unconfigurableExecutorService 메서드도 제공합니다. 알 수 없는 미래 세대에 의해 스레드 풀이 수정되는 것을 원하지 않는 경우 이 메서드를 호출할 수 있습니다. newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }newCachedThreadPool은 캐시된 스레드 풀을 생성합니다. 스레드 수는 0에서 Integer.MAX_VALUE까지 가능하며 제한 시간은 1분입니다. 스레드 풀 사용의 효과는 다음과 같습니다. 유휴 스레드가 있으면 스레드가 재사용됩니다. 유휴 스레드가 없으면 스레드가 1분 이상 유휴 상태이면 새 스레드가 생성됩니다. 재활용. newScheduledThreadPoolnewScheduledThreadPool은 정기적으로 작업을 실행할 수 있는 스레드 풀을 생성합니다. 이에 대해서는 본 글에서 다룰 예정이 아니며, 추후 별도의 글에서 자세히 다루도록 하겠다. 대기 대기열newCachedThreadPool의 스레드 제한은 거의 무제한이지만 시스템 리소스는 제한되어 있으며 작업 처리 속도는 작업 제출 속도만큼 빠르지 않을 수 있습니다. 따라서 스레드 부족으로 인해 대기 중인 Runnable 작업을 저장하기 위해 ThreadPoolExecutor에 차단 대기열을 제공할 수 있습니다. JDK는 BlockingQueue에 대한 여러 구현 방법을 제공합니다. 일반적으로 사용되는 방법은 다음과 같습니다. ArrayBlockingQueue: 배열 구조의 차단 대기열LinkedBlockingQueue: 연결된 목록 구조의 차단 대기열PriorityBlockingQueue: 우선 순위가 지정된 차단 대기열 SynchronousQueue: 요소를 저장하지 않는 차단 대기열
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
饱和策略
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
ThreadFactory
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
更多Java 线程池详解及创建简单实例相关文章请关注PHP中文网!