Home  >  Article  >  Java  >  Introduction to the usage of ThreadPoolExecutor thread pool in Java

Introduction to the usage of ThreadPoolExecutor thread pool in Java

不言
不言forward
2019-04-04 09:55:363684browse

This article brings you an introduction to the usage of ThreadPoolExecutor thread pool in Java. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.

Executors

Executors is a tool class in Java. Provides factory methods to create different types of thread pools.

Introduction to the usage of ThreadPoolExecutor thread pool in Java

It can also be seen from the above figure that the Executors method of creating a thread pool, the created thread pool implements the ExecutorService interface. Commonly used methods are as follows:

newFixedThreadPool(int Threads) : Create a thread pool with a fixed number of threads. Excess threads will wait in the queue.

newCachedThreadPool(): Create a cacheable thread pool. If the length of the thread pool exceeds processing needs, idle threads can be flexibly recycled (60 seconds), if there is no recycling, create a new thread.

newSingleThreadExecutor(): Create a single-threaded thread pool, which will only use the only working thread to execute tasks, ensuring that all tasks are in the specified order ( FIFO, LIFO, priority) execution. If a task execution error occurs, another thread will continue execution.

newScheduledThreadPool(int corePoolSize): Create a thread pool that supports scheduled and periodic task execution , can be used to replace the Timer class in most cases.

Executors Example

newCachedThreadPool

The maximum number of threads is Integer.MAX_VALUE, when we add n to the thread pool Tasks, these n tasks are all executed together.

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newFixedThreadPool

        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newScheduledThreadPool

will be executed once every three seconds. It will only be executed after this execution. .

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

Execute each task in sequence. The next task will not be executed until the first task is executed.

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

What are the problems with Executors

Introduction to the usage of ThreadPoolExecutor thread pool in Java

It is mentioned in the Alibaba Java Development Manual that using Executors to create a thread pool may cause OOM (OutOfMemory, memory overflow), but it does not explain why, so let’s take a look at the end. Why are Executors not allowed?

Let’s start with a simple example to simulate the situation where OOM is caused by using Executors.

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i <p> By specifying JVM parameters: -Xmx8m -Xms8m, running the above code will throw OOM: </p><pre class="brush:php;toolbar:false">Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

The above code points out that line 16 of ExecutorsDemo.java is executor.execute(new SubThread()) in the code;

There are two main implementations of BlockingQueue in Java, namely ArrayBlockingQueue and LinkedBlockingQueue.

ArrayBlockingQueue is a bounded blocking queue implemented with an array, the capacity must be set.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <p>LinkedBlockingQueue is a bounded blocking queue implemented with a linked list, the capacity can be set optionally, If not set, it will be an unbounded blocking queue with a maximum length of Integer.MAX_VALUE.</p><pre class="brush:php;toolbar:false">public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

The problem here is that if we do not set the capacity of LinkedBlockingQueue, its default capacity will be Integer. MAX_VALUE.

When creating LinkedBlockingQueue in newFixedThreadPool, no capacity is specified. At this time, LinkedBlockingQueue is an unbounded queue. For an unbounded queue, tasks can be continuously added to the queue. This In this case, there may be a memory overflow problem due to too many tasks.

The maximum number of threads created by newCachedThreadPool and newScheduledThreadPool may be Integer.MAX_VALUE, and creating so many threads is bound to be possible Causes OOM.

ThreadPoolExecutor creates a thread pool

Avoid using Executors to create a thread pool, mainly to avoid using the default implementation, then we can directly call the constructor of ThreadPoolExecutor to create the thread pool ourselves . When creating, just specify the capacity for BlockQueue.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));

In this case, once the number of submitted threads exceeds the current number of available threads, java.util.concurrent.RejectedExecutionException will be thrown, This is because the queue currently used by the thread pool is a bounded queue. If the queue is full, it cannot continue to process new requests.

In addition to defining ThreadPoolExecutor yourself, there are other methods. Such as apache and guava.

Four constructors

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)</runnable></runnable></runnable></runnable>

int corePoolSize => The maximum number of core threads in the thread pool
When the thread pool creates new threads, if the current total number of threads is less than corePoolSize, then the new thread pool will create new threads. It is a core thread. If it exceeds corePoolSize, the newly created non-core thread

The core thread will always survive in the thread pool by default, even if the core thread does nothing (idle state).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

【相关推荐:Java视频教程

The above is the detailed content of Introduction to the usage of ThreadPoolExecutor thread pool in Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete