Heim  >  Artikel  >  Java  >  Java-Concurrent-Thread-Pool: Detaillierte Erläuterung von ThreadPoolExecutor

Java-Concurrent-Thread-Pool: Detaillierte Erläuterung von ThreadPoolExecutor

php是最好的语言
php是最好的语言Original
2018-08-08 11:23:141872Durchsuche

Zusammenfassung: Das Merkmal des Thread-Pools besteht darin, dass nach der Anzahl der Threads = corePoolSize nur dann eine Aufgabe aus der Aufgabenwarteschlange entnommen wird, wenn die Aufgabenwarteschlange voll ist, und dann ein neuer Thread erstellt wird erstellt, und der Zyklus wird fortgesetzt, bis der Thread die maximale PoolSize erreicht und die Ablehnungsrichtlinie ausgeführt wird.

Thread pool-intsmaze

Die Idee des Thread-Pools besteht darin, einen Bereich im System zu öffnen, um einige Standby-Threads zu speichern Thread-Pool. Wenn eine Aufgabe ausgeführt werden muss, wird ein Thread im Standby-Modus aus dem Thread-Pool ausgeliehen, um die angegebene Aufgabe auszuführen, und der ausgeliehene Thread kann zurückgegeben werden, wenn die Aufgabe endet. Dadurch wird vermieden, dass immer wieder eine große Anzahl von Thread-Objekten erstellt wird und CPU- und Speicherressourcen verschwendet werden.

Benutzerdefinierter Thread-Pool-intsmaze

Wenn Sie sich die Quellcode-Implementierung verschiedener von jdk bereitgestellter Thread-Pools ansehen, können Sie dies feststellen, mit Ausnahme des neuen Thread-Pools newWorkStealingPool, der von jdk8 hinzugefügt wurde Sie basieren alle auf der Kapselungsimplementierung von ThreadPoolExecutor. Erklären Sie daher zunächst die spezifischen Funktionen von ThreadPoolExecutor.

Detaillierte Erklärung von ThreadPoolExecutor -intsmaze

ThreadPoolExecutor( corePoolSize,  maximumPoolSize,  keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize: Geben Sie die Anzahl der Threads im Thread-Pool an

maximumPoolSize: Maximale Thread-Nummer

keepAliveTime: Wenn die Anzahl der Threads corePoolSize überschreitet, überschreitet die Überlebenszeit des Leerlauf-Threads (nach dieser Zeit wird der Leerlauf-Thread zerstört).

Einheit: Zeiteinheit von keepAliveTime

workQueue: Aufgabenwarteschlange, Aufgaben übermittelt, aber nicht ausgeführt

threadFactory: Thread-Factory zum Erstellen von Threads, der Standardwert ist

handler: Ablehnungsrichtlinie, wie Aufgaben abgelehnt werden, wenn zu viele Aufgaben verarbeitet werden müssen, der Standardwert ist die neue AbortPolicy()-Strategie.

        ExecutorService es = new ThreadPoolExecutor(3, 8, 60L,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),                new RejectedExecutionHandler() {                    public void rejectedExecution(Runnable r,
                            ThreadPoolExecutor executor) {
                        System.out.println("discard");
                    }
                });

Aufgabenwarteschlange – Speichern ausführbarer Objekte-intsmaze

Zusammenfassung: Das Merkmal des Thread-Pools ist, dass die Anzahl der threads = corePoolSize, nur wenn die Aufgabenwarteschlange voll ist, wird eine Aufgabe aus der Aufgabenwarteschlange entfernt, dann wird ein neuer Thread erstellt und der Zyklus wird fortgesetzt, bis die Anzahl der Threads die maximale PoolSize erreicht, um die Ablehnungsrichtlinie auszuführen.

Solange die Warteschlange die BlockingQueue-Schnittstelle implementiert, beachten Sie, dass die von ConcurrentLinkedQueue implementierte Warteschlangenschnittstelle der obersten Ebene hier nicht verwendet werden kann.

Üblicherweise werden folgende verwendet:

SynchronousQueue: Die Warteschlange hat keine Kapazität und muss auf einen entsprechenden Löschvorgang warten Im Gegensatz dazu muss jeder Löschvorgang auf den entsprechenden Einfügevorgang warten. Daher speichert es keine Aufgaben und sendet Aufgaben immer zur Ausführung an Threads. Wenn keine Threads im Leerlauf sind, wird ein neuer Thread erstellt. Wenn die Anzahl der Threads das Maximum erreicht, wird eine Ablehnungsrichtlinie ausgeführt.

ArrayBlockingQueue: Begrenzte Task-Warteschlange. Wenn die Anzahl der Threads im Thread-Pool kleiner als corePoolSize ist, werden neue Threads erstellt in die Warteschlange. Wenn die Warteschlange voll ist, wird ein neuer Thread erstellt, um die Aufgabe auszuführen, wenn der Gesamtthread nicht größer als MaximumPoolSize ist. Wenn er größer als MaximumPoolSize ist, wird eine Ablehnungsrichtlinie ausgeführt.

LinkedBlockingQueue: Unbegrenzte Warteschlange. Sofern die Systemressourcen nicht erschöpft sind, tritt kein Fehler beim Einreihen der Aufgabe auf. Wenn die Anzahl der Threads im Thread-Pool kleiner als corePoolSize ist, wird ein neuer Thread erstellt. Wenn sie größer als corePoolSize ist, wird eine neue Aufgabe zur Warteschlange hinzugefügt.

PriortyBlockingQueue: Prioritätsaufgabenwarteschlange, die die Reihenfolge der Aufgabenausführung steuern kann, ist eine unbegrenzte Warteschlange. Sowohl ArrayBlockingQueue als auch LinkedBlockingQueue verarbeiten Aufgaben nach dem First-In-First-Out-Algorithmus und können nacheinander entsprechend der Priorität der Aufgaben selbst ausgeführt werden.

Ablehnungsstrategie-intsmaze

Die Threads im Thread-Pool sind aufgebraucht und die Aufgaben in der Warteschlange sind voll, daher ist eine Ablehnungsstrategie erforderlich : Verarbeitungsaufgaben Was tun, wenn die Menge die tatsächliche Kapazität des Systems überschreitet?

JDK verfügt über vier integrierte Ablehnungsrichtlinien:

AbortPolicy: Eine Ausnahme direkt auslösen (Standardrichtlinie). Auch wenn der Thread-Pool frei ist, können nachfolgende Threads nicht ausgeführt werden . Ja, wenn Sie möchten, dass nachfolgende Threads ausgeführt werden, müssen Sie Ausnahmeinformationen erfassen.

CallerRunsPolicy: Diese Richtlinie führt die aktuell verworfene Aufgabe direkt im Aufrufer-Thread aus. Dadurch wird die Aufgabe natürlich nicht wirklich verworfen, aber die Leistung des Aufgabenübermittlungsthreads wird höchstwahrscheinlich stark sinken.

DiscardOldestPolicy: Verwirft die älteste Anfrage, bei der es sich um eine Aufgabe handelt, die gerade ausgeführt wird, und versucht erneut, die aktuelle Aufgabe zu senden.

DiscardPolicy: Verwerfen Sie nicht verarbeitbare Aufgaben stillschweigend ohne Verarbeitung. Dies ist wahrscheinlich die beste Lösung, wenn Aufgaben verloren gehen dürfen. Wenn der Thread-Pool nicht inaktiv ist, werden die übermittelten Aufgaben verworfen und die übermittelten Aufgaben werden ausgeführt, wenn inaktive Threads vorhanden sind.

Das Folgende ist der Quellcode der Ablehnungsrichtlinie von jdk-intsmaze

   public static class CallerRunsPolicy implements RejectedExecutionHandler {        public CallerRunsPolicy() { }        /**
         * 直接在调用者线程中运行当前被丢弃的任务         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                r.run();
            }
        }
    }    public static class AbortPolicy implements RejectedExecutionHandler {        public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }    public static class DiscardPolicy implements RejectedExecutionHandler {        public DiscardPolicy() { }        /**
         * Does nothing, which has the effect of discarding task r.         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }    public static class DiscardOldestPolicy implements RejectedExecutionHandler {        public DiscardOldestPolicy() { }        /**
         * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。

public class MyTask implements Runnable
{    private int number;    
    public MyTask(int number) {        super();        this.number = number;
    }    public void run() {
        System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    public static void main(String[] args)  {//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, 
//                new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());        
        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());        for(int i=0;i<10000;i++)
        {            try {
                System.out.println(i);
                es.submit(new MyTask(i));
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("------------------------"+i);
            }
        }
    }

线程池执行逻辑源码解析-intsmaze

      public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);        return ftask;
    }    
       /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null     */
    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn&#39;t, by returning false.
         *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 
         自动调用addWorker检查runState和workerCount,
         
         
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *如果任务可以成功排队,那么我们仍然需要
           仔细检查我们是否应该添加一个线程
          (因为现有的自从上次检查后死亡)或者那个
          自进入该方法以来,该池关闭。 所以我们
          重新检查状态,如果有必要的话回滚队列
          停止,或者如果没有的话就开始一个新的线程。
         
         
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.         */
        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))
                reject(command);//队列满了,执行拒绝策略
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        else if (!addWorker(command, false))
            reject(command);
    }    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法    }    
     /**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

jdk的线程池实现类-intsmaze

newFixedThreadPoo-intsmaze

任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。

ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor-intsmaze

任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。

ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool-intsmaze

任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。

ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());
}

newScheduledThreadPool- -定时线程-intsmaze

任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);
}public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());
}public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

newSingleThreadScheduledExecutor- -定时线程-intsmaze

相当于newScheduledThreadPool(int corePoolSize)corePoolSize设置为1。

ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();

延迟线程池

class MyScheduledTask implements Runnable
{ private String tname; public MyScheduledTask(String tname)
 {  this.tname=tname;
 } public void run()
 {
  System.out.println(tname+"任务时延2秒执行!!!");
 }
}public class intsmaze
{ public static void main(String[] args)
 {  ScheduledExecutorService scheduledThreadPool                       =Executors.newScheduledThreadPool(2);
  MyScheduledTask mt1=new MyScheduledTask("MT1");
  scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);
 }
}

 newWorkStealingPool java8新增连接池-intsmaze

    public static ExecutorService newWorkStealingPool(int parallelism) {        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争
    public static ExecutorService newWorkStealingPool() {        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。

关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze

希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。

用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

线程池优化-intsmaze

一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:

Ncpu=CPU的数量

Ucpu=目标CPU的使用率,0<=Ucpu<=1

W/C=等待时间与计算时间的比率

为保持处理器达到期望的使用率,最优的线程池的大小等于:

Nthreads=Ncpu*Ucpu*(1+W/C)

在java中,可以通过

Runtime.getRuntime().availableProcessors()

取得可以CPU数量。

相关推荐:

Java中线程池的图文代码详解

ThreadPoolExecutor线程池之submit方法

JAVA中ThreadPoolExecutor线程池的submit方法详解

Das obige ist der detaillierte Inhalt vonJava-Concurrent-Thread-Pool: Detaillierte Erläuterung von ThreadPoolExecutor. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn