>  기사  >  Java  >  Java 스레드 풀 원리 학습 및 사용(그림)

Java 스레드 풀 원리 학습 및 사용(그림)

黄舟
黄舟원래의
2017-03-23 10:40:591694검색

스레드 풀의 기술적 배경

지향객체프로그래밍에서는 객체를 생성하고 소멸하는 데 시간이 많이 걸립니다. 왜냐하면생성 메모리 자원이나 기타 추가 자원을 얻기 위한 객체 이는 특히 Java에서 가상 머신이 모든 객체를 추적하여 객체가 삭제된 후 가비지 수집될 수 있도록 시도하는 경우에 해당됩니다.

따라서 서비스 프로그램의 효율성을 높이는 한 가지 방법은 생성 및 파기되는 개체 수를 최대한 줄이는 것입니다. 특히 리소스를 많이 소모하는 일부 개체의 생성 및 파기 작업은 더욱 그렇습니다. 기존 객체를 활용하여 서비스를 제공하는 방법은 해결해야 할 핵심 문제입니다. 실제로 이것이 일부 "자원 공유" 기술이 등장한 이유입니다.

예를 들어, Android에서 흔히 볼 수 있는 다양한 이미지 로딩 라이브러리, 네트워크 요청 라이브러리, 심지어 Android 메시징 메커니즘의 Meaasge도 Meaasge를 사용해야 하는 등 일반적으로 "풀" 개념과 분리할 수 없는 많은 공통 구성 요소가 있습니다. )는 Meaasge pool에서 사용되는 객체이므로 이 개념은 매우 중요합니다. 본 글에서 소개하는 스레드 풀 기술 역시 이러한 생각에 부합한다.

스레드 풀의 장점:

  • 스레드 풀의 스레드를 재사용하여 객체 생성 및 삭제로 인한 성능 오버헤드를 줄입니다.

  • 은 최대 동시 스레드 수를 효과적으로 제어하고, 시스템 리소스 활용도를 향상시키며, 과도한 리소스 경쟁과 혼잡을 방지할 수 있습니다.

  • 간단한 멀티 관리가 가능합니다. -스레드, 스레드 사용을 간단하고 효율적으로 만듭니다.

스레드 풀 프레임워크Executor

Java의 스레드 풀은 Executor 프레임워크를 통해 구현됩니다. Executor, Executors, ExecutorService, ThreadPoolExecutor, C모든가능 및 Future 및 FutureTask 사용 등

Executor: 모든 스레드 풀 인터페이스에는 하나의 메서드만 있습니다.

public interface Executor {        
  void execute(Runnable command);      
}

ExecutorService: Executor 동작을 추가하며 Executor 구현 클래스의 가장 직접적인 인터페이스입니다.

Executors: 스레드 풀을 생성하기 위한 일련의 팩토리 메서드를 제공하고 반환된 스레드 풀은 모두 ExecutorService 인터페이스를 구현합니다.

ThreadPoolExecutor: 스레드 풀의 특정 구현 클래스입니다. 일반적으로 사용되는 다양한 스레드 풀이 이 클래스를 기반으로 구현됩니다.
구성 방법은 다음과 같습니다.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

}
  • corePoolSize: 스레드 풀의 코어 스레드 개수와 스레드 풀에서 실행되는 스레드는 항상 corePoolSize를 초과하지 않으며 기본적으로 항상 유지될 수 있습니다. AllowCoreThreadTimeOut을 True로 설정할 수 있습니다. 이때 코어 스레드 수는 0입니다. 이때 keepAliveTime은 모든 스레드의 시간 초과를 제어합니다.

  • maximumPoolSize: 스레드 풀에서 허용하는 최대 스레드 수입니다.

  • keepAliveTime :

  • unit을 나타냅니다.

  • workQueue

    : 작업을 저장하는 BlockingQueue

  • BlockingQueue

    : BlockingQueue(BlockingQueue)는 java.util.concurrent에서 스레드 동기화를 제어하는 ​​데 주로 사용되는 도구입니다. BlockQueue가 비어 있으면 BlockingQueue에서 물건을 가져오는 작업이 차단되고 대기 상태로 들어가며, BlockingQueue에 들어갈 때까지 깨어나지 않습니다. 마찬가지로, BlockingQueue가 가득 차면 여기에 무언가를 저장하려는 모든 작업도 차단되고 대기 상태로 들어가며 BlockingQueue에 공간이 생길 때까지 작업을 계속할 수 없습니다. 차단 대기열은 생산자 및 소비자 시나리오에서 자주 사용됩니다. 생산자는 대기열에 요소를 추가하는 스레드이고 소비자는 대기열에서 요소를 가져오는 스레드입니다. 차단 대기열은 생산자가 요소를 저장하고 소비자는 컨테이너에서 요소만 가져오는 컨테이너입니다. 특정 구현 클래스에는 LinkedBlockingQueue, ArrayBlockingQueued 등이 포함됩니다. 일반적으로 차단 및 깨우기는 Lock 및 Condition(디스플레이 잠금(Lock) 및 조건 학습 및 사용)을 통해 내부적으로 구현됩니다.

스레드 풀의 작동 과정은 다음과 같습니다.

    처음 스레드 풀이 생성되면, 거기에는 스레드가 없습니다. 작업 대기열이 매개변수로 전달됩니다. 그러나 대기열에 작업이 있더라도 스레드 풀은 해당 작업을 즉시 실행하지 않습니다.
  1. execute() 메서드가 작업을 추가하기 위해 호출되면 스레드 풀은 다음과 같은 판단을 내립니다.
    If 스레드가 실행 중입니다. 숫자가 corePoolSize보다 작으면 즉시 스레드를 생성하여 작업을 실행합니다.
  • 실행 중인 스레드 수가 corePoolSize보다 크거나 같으면 다음을 입력합니다. 작업을 대기열에 넣습니다
  • 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

  • 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

  • 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  • 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

  • 线程池的创建和使用

    生成线程池采用了工具类Executors的静态方法,以下是几种常见的线程池。

    SingleThreadExecutor:单个后台线程 (其缓冲队列是无界的)

    public static ExecutorService newSingleThreadExecutor() {        
        return new FinalizableDelegatedExecutorService (
            new ThreadPoolExecutor(1, 1,                                    
            0L, TimeUnit.MILLISECONDS,                                    
            new LinkedBlockingQueue<Runnable>()));   
    }

    创建一个单线程的线程池。这个线程池只有一个核心线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    FixedThreadPool:只有核心线程的线程池,大小固定 (其缓冲队列是无界的) 。

    public static ExecutorService newFixedThreadPool(int nThreads) {         
            return new ThreadPoolExecutor(nThreads, nThreads,                                       
                0L, TimeUnit.MILLISECONDS,                                         
                new LinkedBlockingQueue<Runnable>());     
    }

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    CachedThreadPool:无界线程池,可以进行自动线程回收。

    public static ExecutorService newCachedThreadPool() {         
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,                                           
               60L, TimeUnit.SECONDS,                                       
               new SynchronousQueue<Runnable>());     
    }

    如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。SynchronousQueue是一个是缓冲区为1的阻塞队列。

    ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    public static ExecutorService newScheduledThreadPool(int corePoolSize) {         
        return new ScheduledThreadPool(corePoolSize, 
                  Integer.MAX_VALUE,                                                  
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,                                                    
                  new DelayedWorkQueue());    
    }

    创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。

    线程池最常用的提交任务的方法有两种:

    execute:

    ExecutorService.execute(Runnable runable);

    submit:

    FutureTask task = ExecutorService.submit(Runnable runnable);
    
    FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);
    
    FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

    submit(Callable callable)的实现,submit(Runnable runnable)同理。

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        FutureTask<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    可以看出submit开启的是有返回结果的任务,会返回一个FutureTask对象,这样就能通过get()方法得到结果。submit最终调用的也是execute(Runnable runable),submit只是将Callable对象或Runnable封装成一个FutureTask对象,因为FutureTask是个Runnable,所以可以在execute中执行。关于Callable对象和Runnable怎么封装成FutureTask对象,见Callable和Future、FutureTask的使用。

    线程池实现的原理

    如果只讲线程池的使用,那这篇博客没有什么大的价值,充其量也就是熟悉Executor相关API的过程。线程池的实现过程没有用到Synchronized关键字,用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类,FutureTask等等,因为后者的性能更优。理解的过程可以很好的学习源码中并发控制的思想。

    在开篇提到过线程池的优点是可总结为以下三点:

    1. 线程复用

    2. 控制最大并发数

    3. 管理线程

    1.线程复用过程

    理解线程复用原理首先应了解线程生命周期

    在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。

    Thread通过new来新建一个线程,这个过程是是初始化一些线程信息,如线程名,id,线程所属group等,可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器,同时将hasBeenStarted为true,之后调用start方法就会有异常。

    处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于JVM里线程调度器的调度。当线程获取cpu后,run()方法会被调用。不要自己去调用Thread的run()方法。之后根据CPU的调度在就绪——运行——阻塞间切换,直到run()方法结束或其他方式停止线程,进入dead状态。

    所以实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)。接下来来看下ThreadPoolExecutor是怎么实现线程复用的。

    在ThreadPoolExecutor主要Worker类来控制线程的复用。看下Worker类简化后的代码,这样方便理解:

    private final class Worker implements Runnable {
    
    	final Thread thread;
    
    	Runnable firstTask;
    
    	Worker(Runnable firstTask) {
    		this.firstTask = firstTask;
    		this.thread = getThreadFactory().newThread(this);
    	}
    
    	public void run() {
    		runWorker(this);
    	}
    
    	final void runWorker(Worker w) {
    		Runnable task = w.firstTask;
    		w.firstTask = null;
    		while (task != null || (task = getTask()) != null){
    		task.run();
    	}
    }

    Worker是一个Runnable,同时拥有一个thread,这个thread就是要开启的线程,在新建Worker对象时同时新建一个Thread对象,同时将Worker自己作为参数传入TThread,这样当Thread的start()方法调用时,运行的实际上是Worker的run()方法,接着到runWorker()中,有个while循环,一直从getTask()里得到Runnable对象,顺序执行。getTask()又是怎么得到Runnable对象的呢?

    依旧是简化后的代码:

    private Runnable getTask() {
        if(一些特殊情况) {
            return null;
        }
    
        Runnable r = workQueue.take();
    
        return r;
    }

    这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。

    2.控制最大并发数

    那Runnable是什么时候放入workQueue?Worker又是什么时候创建,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?

    很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。

    execute:

    简化后的代码

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
         int c = ctl.get();
        // 当前线程数 < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 直接启动新的线程。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    
        // 活动线程数 >= corePoolSize
        // runState为RUNNING && 队列未满
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检验是否为RUNNING状态
            // 非RUNNING状态 则从workQueue中移除任务并拒绝
            if (!isRunning(recheck) && remove(command))
                reject(command);// 采用线程池指定的策略拒绝任务
            // 两种情况:
            // 1.非RUNNING状态拒绝新的任务
            // 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
        } else if (!addWorker(command, false))
            reject(command);
    }

    addWorker:

    简化后的代码

    private boolean addWorker(Runnable firstTask, boolean core) {
    
        int wc = workerCountOf(c);
        if (wc >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }
    
        w = new Worker(firstTask);
        final Thread t = w.thread;
        t.start();
    }

    根据代码再来看上面提到的线程池工作过程中的添加任务的情况:

    * 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;   
    * 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    * 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    * 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

    这就是Android的AsyncTask在并行执行是在超出最大任务数是抛出RejectExecutionException的原因所在,详见基于最新版本的AsyncTask源码解读及AsyncTask的黑暗面

    通过addWorker如果成功创建新的线程成功,则通过start()开启新线程,同时将firstTask作为这个Worker里的run()中执行的第一个任务。

    虽然每个Worker的任务是串行处理,但如果创建了多个Worker,因为共用一个workQueue,所以就会并行处理了。

    所以根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可用下图表示。

    上面的讲解和图来可以很好的理解的这个过程。

    如果是做Android开发的,并且对Handler原理比较熟悉,你可能会觉得这个图挺熟悉,其中的一些过程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相当于execute(Runnuble),Looper中维护的Meaasge队列相当于BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。

    3.管理线程

    通过线程池可以很好的管理线程的复用,控制并发数,以及销毁等过程,线程的复用和控制并发上面已经讲了,而线程的管理过程已经穿插在其中了,也很好理解。

    在ThreadPoolExecutor有个ctl的AtomicInteger变量。通过这一个变量保存了两个内容:

    • 所有线程的数量

    • 每个线程所处的状态

    其中低29位存线程数,高3位存runState,通过位运算来得到不同的值。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    //得到线程的状态
    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }
    
    //得到Worker的的数量
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    
    // 判断线程是否在运行
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    这里主要通过shutdown和shutdownNow()来分析线程池的关闭过程。首先线程池有五种状态来控制任务添加与执行。主要介绍以下三种:

    • RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;

    • SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;

    • STOP状态:不再接受新任务,不处理队列中的任务

    shutdown 메소드는 runState를 SHUTDOWN으로 설정하고 모든 유휴 스레드를 종료합니다. 아직 작동 중인 스레드는 영향을 받지 않으므로 대기열에 있는 작업이 실행됩니다. shutdownNow 메서드는 runState를 STOP으로 설정합니다. 종료 방법과의 차이점은 이 방법은 모든 스레드를 종료하므로 대기열에 있는 작업이 실행되지 않는다는 것입니다.

    요약

    ThreadPoolExecutor 소스 코드 분석을 통해 스레드 풀 생성, 작업 추가, 작업 실행 등의 과정을 전반적으로 이해하고 있습니다. , 스레드 풀을 사용하는 것이 더 쉬울 것입니다.

    이를 통해 학습한 동시성 제어와 생산자-소비자 모델 작업 처리의 활용은 앞으로 관련된 다른 문제를 이해하거나 해결하는 데 큰 도움이 될 것입니다. 예를 들어, Android의 Handler 메커니즘과 Looper의 Messager 대기열도 BlookQueue에 의해 처리될 수 있습니다. 이는 소스 코드를 읽음으로써 얻을 수 있는 것입니다.

    위 내용은 Java 스레드 풀 원리 학습 및 사용(그림)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.