>Java >java지도 시간 >Java Thread Pool 생성 과정 분석

Java Thread Pool 생성 과정 분석

黄舟
黄舟원래의
2017-02-23 10:44:401392검색

최근 프로젝트의 동시성 기능을 개선하고 있지만 개발이 순조롭지 않습니다. 많은 정보를 읽은 후 마침내 이해가 깊어졌습니다. 그래서 소스코드를 함께 확인하고 동시성 프로그래밍의 원리를 정리해보기로 했습니다.

가장 많이 사용되는 스레드 풀부터 시작하여 생성, 실행 및 종료를 중심으로 스레드 풀의 전체 수명 주기에 대한 구현 원리를 이해할 수 있도록 준비하세요. 나중에 원자 변수, 동시 컨테이너, 차단 대기열, 동기화 도구, 잠금 등과 같은 주제를 연구합니다. java.util.concurrent의 동시성 도구는 사용하기 어렵지 않지만 그냥 사용할 수는 없습니다. 빌어먹을 소스 코드를 읽어야 합니다. 하하. 참고로 제가 사용하는 JDK는 1.8입니다.

Executor 프레임워크

Executor는 스레드 풀 관리 프레임워크로, 인터페이스에는 Runnable 작업을 실행하는 메서드가 하나만 있습니다. ExecutorService 인터페이스는 Executor를 확장하고 스레드 수명주기 관리를 추가하며 작업 종료 및 작업 결과 반환과 같은 메서드를 제공합니다. AbstractExecutorService는 ExecutorService를 구현하고 submit 메소드와 같은 기본 구현 로직을 제공합니다.

그럼 오늘의 주제인 ThreadPoolExecutor는 AbstractExecutorService를 상속받아 스레드 풀의 구체적인 구현을 제공합니다.

생성자 메서드

다음은 최대 7개의 매개변수를 갖는 ThreadPoolExecutor의 가장 일반적인 생성자입니다. 구체적인 코드는 게시하지 않고 매개변수 확인 및 설정에 대한 몇 가지 설명만 게시하겠습니다.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

corePoolSize는 스레드 풀의 대상 크기로, 스레드 풀이 방금 생성되어 실행할 작업이 없을 때의 크기입니다. maximumPoolSize는 스레드 풀의 최대 상한입니다. keepAliveTime은 스레드의 생존 시간입니다. 스레드 풀의 스레드 수가 corePoolSize보다 크면 생존 시간을 초과하는 유휴 스레드가 재활용됩니다. 단위는 말할 필요도 없이 나머지 3개 매개변수는 나중에 분석하겠습니다.

기본 맞춤형 스레드 풀

ThreadPoolExecutor는 Executors의 팩토리 메서드로 생성된 일부 맞춤형 스레드 풀을 미리 설정합니다. newSingleThreadExecutor, newFixedThreadPool, newCachedThreadPool의 생성 매개변수를 분석해 보겠습니다.

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool의 corePoolSize 및 maximumPoolSize는 모두 수신 고정 숫자로 설정되고 keepAliveTim은 0으로 설정됩니다. 스레드 풀이 생성된 후에는 스레드 수가 고정되므로 스레드 안정성이 필요한 상황에 적합합니다.

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor는 스레드 수가 1로 고정된 newFixedThreadPool 버전으로, 풀에서 작업의 직렬화를 보장합니다. FinalizingDelegatedExecutorService가 반환된다는 점에 주목하세요.

static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

FinalizedDelegatedExecutorService는 DelegatedExecutorService를 상속하고 gc 중에 스레드 풀을 닫는 작업만 추가합니다. DelegatedExecutorService의 소스 코드:

 static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        //...
    }

DelegatedExecutorService는 ExecutorService의 메서드만 노출하도록 ExecutorService를 래핑하므로 더 이상 스레드 풀의 매개 변수를 구성할 수 없습니다. 원래 스레드 풀에 의해 생성된 매개변수는 조정이 가능하며, ThreadPoolExecutor는 set 메소드를 제공합니다. newSingleThreadExecutor를 사용하는 목적은 단일 스레드 직렬 스레드 풀을 생성하는 것입니다. 스레드 풀 크기도 구성할 수 있다면 지루할 것입니다.

Executors는 일반 스레드 풀을 구성할 수 없는 스레드 풀로 래핑하는 unconfigurableExecutorService 메서드도 제공합니다. 알 수 없는 미래 세대가 스레드 풀을 수정하는 것을 원하지 않는 경우 이 메서드를 호출할 수 있습니다.

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool은 캐시된 스레드 풀을 생성합니다. 스레드 수는 0에서 Integer.MAX_VALUE까지 가능하며 시간 초과는 다음과 같습니다. 1분. 스레드 풀 사용의 효과는 다음과 같습니다. 유휴 스레드가 있으면 스레드가 재사용됩니다. 유휴 스레드가 없으면 스레드가 1분 이상 유휴 상태이면 새 스레드가 생성됩니다. 재활용.

newScheduledThreadPool

newScheduledThreadPool은 정기적으로 작업을 실행할 수 있는 스레드 풀을 생성합니다. 이에 대해서는 이 글에서는 다루지 않고, 나중에 별도의 글에서 자세히 다루겠습니다.

等待队列

newCachedThreadPool的线程上限几乎等同于无限,但系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。

JDK为BlockingQueue提供了几种实现方式,常用的有:

ArrayBlockingQueue:数组结构的阻塞队列

LinkedBlockingQueue:链表结构的阻塞队列

PriorityBlockingQueue:有优先级的阻塞队列

SynchronousQueue:不会存储元素的阻塞队列

newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。

newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。

饱和策略

当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
       public AbortPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
       }
   }

AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:

 

public static class DiscardPolicy implements RejectedExecutionHandler {
       public DiscardPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       }
   }

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。

 

 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       public DiscardOldestPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               e.getQueue().poll();
               e.execute(r);
           }
       }
   }

DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。

ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
       private static final AtomicInteger poolNumber = new AtomicInteger(1);
       private final ThreadGroup group;
       private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final String namePrefix;
       DefaultThreadFactory() {
           SecurityManager s = System.getSecurityManager();
           group = (s != null) ? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
           namePrefix = "pool-" +
                         poolNumber.getAndIncrement() +
                        "-thread-";
       }
       public Thread newThread(Runnable r) {
           Thread t = new Thread(group, r,
                                 namePrefix + threadNumber.getAndIncrement(),
                                 0);
           if (t.isDaemon())
               t.setDaemon(false);
           if (t.getPriority() != Thread.NORM_PRIORITY)
               t.setPriority(Thread.NORM_PRIORITY);
           return t;
       }
   }

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。

不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。

欢迎留言和转发,下一篇打算分析线程池如何执行任务。

 以上就是Java 线程池的创建过程分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.