Heim  >  Artikel  >  Java  >  So verwenden Sie die ThreadPoolExecutor-Klasse des Java-Thread-Pools

So verwenden Sie die ThreadPoolExecutor-Klasse des Java-Thread-Pools

王林
王林nach vorne
2023-04-26 13:31:151436Durchsuche

Im „Alibaba Java Development Manual“ wird darauf hingewiesen, dass Thread-Ressourcen über den Thread-Pool bereitgestellt werden müssen und die Thread-Erstellung nicht in der Anwendung angezeigt werden darf Die Anzahl der geöffneten Threads kann andererseits angemessen gesteuert werden. Einerseits wird die detaillierte Verwaltung der Threads vom Thread-Pool übernommen, wodurch der Ressourcenaufwand optimiert wird. Der Thread-Pool darf nicht mit Executors, sondern mit ThreadPoolExecutor erstellt werden. Dies liegt daran, dass das Executor-Framework in JDK zwar Methoden wie newFixedThreadPool(), newSingleThreadExecutor(), newCachedThreadPool() usw. bereitstellt, um Thread-Pools zu erstellen Die Einschränkungen sind nicht flexibel genug. Da die vorherigen Methoden auch intern über ThreadPoolExecutor implementiert werden, kann die Verwendung von ThreadPoolExecutor Ihnen dabei helfen, die Betriebsregeln des Thread-Pools zu klären und einen Thread-Pool zu erstellen, der den Anforderungen Ihrer eigenen Geschäftsszenarien entspricht. und vermeiden Sie das Risiko einer Ressourcenerschöpfung.

Nachfolgend geben wir einen detaillierten Überblick über die Verwendung von ThreadPoolExecutor.

Schauen Sie sich zunächst den Konstruktor von ThreadPoolExecutor an

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Die Bedeutung der Parameter des Konstruktors ist wie folgt:

corePoolSize: Gibt die Anzahl der Threads im Thread-Pool an. Seine Anzahl bestimmt, ob die hinzugefügte Aufgabe einen neuen Thread öffnet Zur Ausführung oder zur Seite legen.

maximumPoolSize: Gibt die maximale Anzahl von Threads im Thread-Pool an, die basierend auf dem Typ der WorkQueue geöffnet werden Von Ihnen verwendete Aufgabenwarteschlange;

keepAliveTime: Wenn die Anzahl der inaktiven Threads im Pool corePoolSize überschreitet, wie lange dauert es, bis die überschüssigen Threads zerstört sind;

unit: Einheit von keepAliveTime

workQueue: Aufgabenwarteschlange , Aufgaben, die dem Thread-Pool hinzugefügt, aber noch nicht ausgeführt wurden. Im Allgemeinen unterteilt in direkte Übermittlungswarteschlange, begrenzte Aufgabenwarteschlange, unbegrenzte Aufgabenwarteschlange und Prioritätsaufgabenwarteschlange: Thread-Factory, die im Allgemeinen zum Erstellen von Threads verwendet wird Die Standardeinstellung kann verwendet werden.

Handler: Ablehnungsstrategie; wenn die Aufgabe zu groß ist

1. WorkQueue-Aufgabenwarteschlange

Sie ist im Allgemeinen in direkte Übermittlungswarteschlange, unbegrenzte Aufgabenwarteschlange und Prioritätsaufgabenwarteschlange unterteilt.

1. Direkte Übermittlungswarteschlange ist eine spezielle BlockingQueue. Sie wird blockiert, ohne einen Einfügevorgang auszuführen, und umgekehrt muss jeder Löschvorgang auch auf den entsprechenden Einfügevorgang warten.

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    public void run() {
        System.out.println(Thread.currentThread().getName());

Das Ausgabeergebnis ist

pool-1-thread-1

pool-1-thread-2

Ausnahme im Thread „main“ java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 abgelehnt von java.util.concurrent.ThreadPoolExecutor@3d4eac69[Wird ausgeführt, Poolgröße = 2, aktive Threads = 0, Aufgaben in der Warteschlange = 0, abgeschlossene Aufgaben = 2]

bei java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unbekannte Quelle )
bei java.util.concurrent.ThreadPoolExecutor.reject(Unbekannte Quelle)

bei java.util.concurrent.ThreadPoolExecutor.execute(Unbekannte Quelle)
bei com.hhxx.test.ThreadPool.main(ThreadPool.java:17)


Sie können sehen, dass die Ablehnungsrichtlinie direkt ausgeführt und eine Ausnahme ausgelöst wird, wenn die Aufgabenwarteschlange SynchronousQueue ist und die Anzahl der erstellten Threads größer als MaximumPoolSize ist.

Mit SynchronousQueue werden übermittelte Aufgaben nicht gespeichert und immer sofort zur Ausführung weitergeleitet. Wenn die Anzahl der zum Ausführen von Aufgaben verwendeten Threads geringer ist als MaximumPoolSize, versuchen Sie, einen neuen Prozess zu erstellen. Wenn der durch MaximumPoolSize festgelegte Maximalwert erreicht ist, wird die Ablehnungsrichtlinie gemäß dem von Ihnen festgelegten Handler ausgeführt. Daher werden die auf diese Weise übermittelten Aufgaben nicht zwischengespeichert, sondern sofort ausgeführt. In diesem Fall müssen Sie die Parallelität Ihres Programms genau einschätzen, um die entsprechende maximale PoolSize-Menge festzulegen sehr schwierig. Es ist einfach, die Ablehnungsrichtlinie zu implementieren.

2. Die begrenzte Aufgabenwarteschlange kann wie unten gezeigt implementiert werden.

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

Verwenden Sie die begrenzte Aufgabenwarteschlange von ArrayBlockingQueue Wenn der Thread ausgeführt wird, erstellt der Pool neue Threads, bis die Anzahl der erstellten Threads corePoolSize erreicht. Anschließend werden neue Aufgaben zur Warteschlange hinzugefügt. Wenn die Warteschlange voll ist, dh die anfängliche Kapazität von ArrayBlockingQueue überschreitet, werden weiterhin Threads erstellt, bis die Anzahl der Threads die durch „maximumPoolSize“ festgelegte maximale Anzahl von Threads erreicht. Wenn sie größer als „maximumPoolSize“ ist, wird die Ablehnungsrichtlinie ausgeführt . In diesem Fall hängt die Obergrenze der Thread-Anzahl direkt vom Status der begrenzten Aufgabenwarteschlange ab. Wenn die anfängliche Kapazität der begrenzten Aufgabenwarteschlange groß ist oder keinen Überlastungszustand erreicht hat, wird die Anzahl der Threads immer beibehalten Wenn die Aufgabenwarteschlange voll ist, wird die maximale Poolgröße als Obergrenze für die maximale Anzahl von Threads verwendet.

3. Unbegrenzte Aufgabenwarteschlange: Eine begrenzte Aufgabenwarteschlange kann mit LinkedBlockingQueue implementiert werden, wie unten gezeigt

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //优先任务队列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    public int getPriority() {
        return priority;
    public void setPriority(int priority) {
        this.priority = priority;
    public ThreadTask() {
        
    public ThreadTask(int priority) {
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

我们来看下执行的结果情况

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

二、拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;

3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;

4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义拒绝策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"执行了拒绝策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

输出结果:

com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

三、ThreadFactory自定义线程创建

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义线程工厂
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //输出执行线程的名称
        System.out.println("ThreadName:"+Thread.currentThread().getName());

我们看下输出结果

线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

四、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //实现自定义接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //输出执行线程的名称
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下输出结果

线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0---ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1---ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出

可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

五、线程池线程数量

线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

/**
             * Nthreads=CPU数量
             * Ucpu=目标CPU的使用率,0<=Ucpu<=1
             * W/C=任务等待时间与任务计算时间的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

Das obige ist der detaillierte Inhalt vonSo verwenden Sie die ThreadPoolExecutor-Klasse des Java-Thread-Pools. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen