>  기사  >  Java  >  Java 스레드 풀 ThreadPoolExecutor 클래스를 사용하는 방법

Java 스레드 풀 ThreadPoolExecutor 클래스를 사용하는 방법

王林
王林앞으로
2023-04-26 13:31:151508검색

"Alibaba Java 개발 매뉴얼"에는 스레드 리소스가 스레드 풀을 통해 제공되어야 하며 스레드 생성이 애플리케이션에 표시되는 것이 허용되지 않는다고 지적되어 있습니다. 열린 스레드 수는 합리적으로 제어할 수 있습니다. 한편으로는 스레드의 세부적인 관리는 스레드 풀에서 처리되어 리소스 오버헤드가 최적화됩니다. 스레드 풀은 Executor를 사용하여 생성할 수 없지만 ThreadPoolExecutor를 통해 생성할 수 있습니다. 이는 jdk의 Executor 프레임워크가 스레드 풀을 생성하기 위한 newFixedThreadPool(), newSingleThreadExecutor(), newCachedThreadPool() 등과 같은 메서드를 제공하지만 모두 가능하기 때문입니다. 또한 이전 메소드도 ThreadPoolExecutor를 통해 내부적으로 구현되므로 ThreadPoolExecutor를 사용하면 스레드 풀의 운영 규칙을 명확하게 하고 비즈니스 시나리오의 요구 사항을 충족하는 스레드 풀을 생성하는 데 도움이 될 수 있습니다. 그리고 자원 고갈의 위험을 피하십시오.

아래에서는 ThreadPoolExecutor 사용 방법에 대한 자세한 개요를 제공합니다.

ThreadPoolExecutor의 생성자를 먼저 살펴보세요

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

생성자의 매개변수 의미는 다음과 같습니다.

corePoolSize: 스레드 풀의 스레드 수를 지정합니다. 해당 숫자는 추가된 작업이 새 스레드를 열지 여부를 결정합니다. 실행하거나 따로 보관하세요. workQueue 작업 대기열로 이동하세요.

maximumPoolSize: 스레드 풀의 최대 스레드 수를 지정합니다. 이 매개변수는 workQueue 유형에 따라 스레드 풀이 열리는 최대 스레드 수를 결정합니다.

keepAliveTime: 스레드 풀의 유휴 스레드 수가 corePoolSize를 초과하면 초과 스레드가 삭제되는 데 걸리는 시간

unit: keepAliveTime 단위

workQueue: 작업 대기열 , 스레드 풀에 추가되었지만 아직 실행되지 않은 작업; 일반적으로 직접 제출 대기열, 제한된 작업 대기열, 무제한 작업 대기열 및 우선 순위 작업 대기열로 구분됩니다.

threadFactory: 일반적으로 스레드를 생성하는 데 사용됩니다. 기본값을 사용할 수 있습니다.

처리자: 작업이 너무 큰 경우 작업을 처리하는 데 시간이 너무 많이 걸릴 때 작업을 거부하는 방법

다음으로 더 중요한 매개변수에 대해 더 자세히 살펴보겠습니다.

1. workQueue 작업 대기열

위에서 이미 소개했습니다. 일반적으로 직접 제출 대기열과 제한되지 않은 작업 대기열, 우선 순위 작업 대기열로 구분됩니다.

1. 직접 제출 대기열: 동기 대기열로 설정합니다. 특별한 BlockingQueue입니다. 삽입 작업을 수행하지 않고 차단되며 삭제 작업을 수행해야 하며, 반대로 각 삭제 작업도 해당 삽입 작업을 기다려야 합니다.

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    public void run() {
        System.out.println(Thread.currentThread().getName());

출력 결과는

pool-1-thread-1

pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302입니다. java.util.concurrent.ThreadPoolExecutor@3d4eac69[실행 중, 풀 크기 = 2, 활성 스레드 = 0, 대기 중인 작업 = 0, 완료된 작업 = 2]
에서 거부됨 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(알 수 없는 소스 )
java.util.concurrent.ThreadPoolExecutor.reject(알 수 없는 소스)
java.util.concurrent.ThreadPoolExecutor.execute(알 수 없는 소스)
com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

작업 큐가 동기 큐이고 생성된 스레드 수가 maximumPoolSize보다 큰 경우 거부 정책이 직접 실행되어 예외가 발생하는 것을 볼 수 있습니다.

SynchronousQueue를 사용하면 제출된 작업은 저장되지 않으며 항상 즉시 실행되도록 제출됩니다. 작업 수행에 사용되는 스레드 수가 maximumPoolSize보다 적은 경우 새 프로세스 생성을 시도합니다. maximumPoolSize에 설정된 최대값에 도달하면 설정한 핸들러에 따라 거부 정책이 실행됩니다. 따라서 이러한 방식으로 제출한 작업은 캐시되지 않고 즉시 실행됩니다. 이 경우 적절한 maximumPoolSize 양을 설정하려면 프로그램의 동시성을 정확하게 평가해야 합니다. 거부 정책을 구현하는 것은 매우 쉽습니다.

2. 제한된 작업 대기열은 아래와 같이 ArrayBlockingQueue를 사용하여 구현할 수 있습니다.

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

필요한 새 작업이 있는 경우 ArrayBlockingQueue를 사용하세요. 스레드가 실행되면 풀은 생성된 스레드 수가 corePoolSize에 도달할 때까지 새 스레드를 생성한 다음 대기 대기열에 새 작업이 추가됩니다. 대기 큐가 가득 찬 경우, 즉 ArrayBlockingQueue의 초기 용량을 초과하는 경우, 스레드 수가 maximumPoolSize에 설정된 최대 스레드 수에 도달할 때까지 계속해서 스레드를 생성합니다. 이 값이 maximumPoolSize보다 크면 거부 정책이 실행됩니다. . 이 경우 스레드 수의 상한은 제한된 작업 큐의 상태와 직접적으로 관련됩니다. 제한된 큐의 초기 용량이 크거나 과부하 상태에 도달하지 않은 경우 스레드 수는 항상 유지됩니다. 그렇지 않으면 작업 대기열이 가득 차면 최대 스레드 수의 상한선으로 maximumPoolSize가 사용됩니다.

3. 제한되지 않은 작업 대기열: 제한된 작업 대기열은 아래와 같이 LinkedBlockingQueue를 사용하여 구현할 수 있습니다

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //优先任务队列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    public int getPriority() {
        return priority;
    public void setPriority(int priority) {
        this.priority = priority;
    public ThreadTask() {
        
    public ThreadTask(int priority) {
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

我们来看下执行的结果情况

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

二、拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;

3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;

4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义拒绝策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"执行了拒绝策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

输出结果:

com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

三、ThreadFactory自定义线程创建

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义线程工厂
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //输出执行线程的名称
        System.out.println("ThreadName:"+Thread.currentThread().getName());

我们看下输出结果

线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

四、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //实现自定义接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //输出执行线程的名称
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下输出结果

线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0---ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1---ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出

可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

五、线程池线程数量

线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

/**
             * Nthreads=CPU数量
             * Ucpu=目标CPU的使用率,0<=Ucpu<=1
             * W/C=任务等待时间与任务计算时间的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

위 내용은 Java 스레드 풀 ThreadPoolExecutor 클래스를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제