요약: 스레드 풀의 특징은 스레드 수 = corePoolSize 이후 작업 큐가 가득 찬 경우에만 작업 큐에서 작업을 꺼내고 새 스레드가 구성된다는 것입니다. 스레드 수가 maximumPoolSize에 도달할 때까지 주기가 계속됩니다.
스레드 풀의 아이디어는 일부 대기 스레드를 저장하기 위해 시스템에 영역을 여는 것입니다. 이 영역을 스레드 풀이라고 합니다. 실행해야 할 작업이 있으면 스레드 풀에서 대기 중인 스레드를 빌려와 지정된 작업을 실행하고, 작업이 끝나면 빌린 스레드를 반환할 수 있다. 이렇게 하면 많은 수의 스레드 개체가 반복적으로 생성되고 CPU 및 메모리 리소스가 낭비되는 것을 방지할 수 있습니다.
jdk에서 제공하는 다양한 스레드 풀의 소스 코드 구현을 관찰하면 jdk8에서 추가한 새로운 스레드 풀 newWorkStealingPool을 제외하면 모두 ThreadPoolExecutor의 캡슐화 구현을 기반으로 한다는 것을 알 수 있습니다. , 먼저 ThreadPoolExecutor 특정 기능을 설명하십시오.
ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize: 스레드 풀의 스레드 수 지정
maximumPoolSize: 최대 스레드 수
keepAliveTime: 스레드 수가 corePoolSize를 초과할 경우 생존 시간 of more 유휴 스레드(이보다 더 많음) 잠시 후 유휴 스레드가 삭제됩니다.
unit: keepAliveTime의 시간 단위
workQueue: 작업 대기열, 제출되었지만 실행되지 않은 작업
threadFactory: 스레드 생성을 위한 스레드 팩토리, 기본값이면 충분
handler: 거부 정책, 처리할 작업이 너무 많은 경우, 새로운 AbortPolicy() 정책을 기본값으로 하여 작업을 거부하는 방법.
ExecutorService es = new ThreadPoolExecutor(3, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("discard"); } });
요약: 스레드 풀의 특징은 스레드 수 = corePoolSize 이후에 작업 대기열이 가득 찼을 때만 하나를 꺼낸다는 것입니다. 작업 대기열 Task에서 새 스레드를 구성하고 스레드 수가 maximumPoolSize에 도달할 때까지 반복하여 거부 정책을 실행합니다.
대기열이 BlockingQueue 인터페이스를 구현하는 한, 여기서는 ConcurrentLinkedQueue로 구현된 최상위 대기열 인터페이스를 사용할 수 없습니다.
일반적으로 사용되는 것은 다음과 같습니다.
SynchronousQueue: 큐를 직접 제출합니다. 큐에는 용량이 없습니다. 각 삽입 작업은 해당 삭제 작업을 기다려야 합니다. 삽입 작업. 따라서 작업을 저장하지 않고 항상 스레드에 제출하여 실행합니다. 유휴 스레드가 없으면 새 스레드가 생성되고 거부 정책이 실행됩니다.
ArrayBlockingQueue: Bounded 작업 대기열. 스레드 풀의 스레드 수가 corePoolSize보다 작으면 새 스레드가 생성되며, corePoolSize보다 크면 새 작업이 대기 대기열에 추가됩니다. 대기 대기열이 가득 찬 경우 총 스레드가 maximumPoolSize보다 크지 않으면 작업을 실행하기 위해 새 스레드가 생성됩니다. 총 스레드가 maximumPoolSize보다 크면 거부 정책이 실행됩니다.
LinkedBlockingQueue: 무제한 대기열, 시스템 리소스가 소진되지 않는 한 작업을 대기열에 넣는 데 실패하지 않습니다. 스레드 풀의 스레드 수가 corePoolSize보다 적으면 새 스레드가 생성되고, corePoolSize보다 크면 새 작업이 대기 대기열에 추가됩니다.
PriortyBlockingQueue: 작업 실행 순서를 제어할 수 있는 우선순위 작업 대기열은 무제한 대기열입니다. ArrayBlockingQueue와 LinkedBlockingQueue는 모두 선입선출 알고리즘에 따라 작업을 처리합니다. PriorityBlockingQueue는 작업 자체의 우선 순위에 따라 순차적으로 실행될 수 있습니다.
스레드 풀의 스레드가 모두 소모되고 대기 큐의 작업이 가득 차서 더 이상 새 작업을 채울 수 없으므로 거부 전략이 필요합니다. 처리 작업 수가 초과되는 경우. 시스템의 실제 용량, 처리 방법.
jdk에는 4가지 기본 거부 정책이 있습니다.
AbortPolicy: 스레드 풀이 비어 있어도 후속 스레드를 실행할 수 없습니다. 예외 정보.
CallerRunsPolicy: 이 정책은 현재 삭제된 작업을 호출자 스레드에서 직접 실행합니다. 분명히 이렇게 하면 실제로 작업이 삭제되지는 않지만 작업 제출 스레드의 성능이 급격히 떨어질 가능성이 높습니다.
DiscardOldestPolicy: 곧 실행될 작업인 가장 오래된 요청을 삭제하고 현재 작업을 다시 제출하려고 시도합니다.
DiscardPolicy: 처리 없이는 처리할 수 없는 작업을 자동으로 삭제합니다. 작업이 손실될 수 있는 경우 이것이 아마도 최선의 솔루션일 것입니다. 스레드 풀이 유휴 상태가 아닌 경우 제출된 작업은 삭제되고 유휴 스레드가 있을 때 제출된 작업이 실행됩니다.
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 直接在调用者线程中运行当前被丢弃的任务 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。
public class MyTask implements Runnable { private int number; public MyTask(int number) { super(); this.number = number; } public void run() { System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) {// ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i=0;i<10000;i++) { try { System.out.println(i); es.submit(new MyTask(i)); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); System.out.println("------------------------"+i); } } }
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 自动调用addWorker检查runState和workerCount, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *如果任务可以成功排队,那么我们仍然需要 仔细检查我们是否应该添加一个线程 (因为现有的自从上次检查后死亡)或者那个 自进入该方法以来,该池关闭。 所以我们 重新检查状态,如果有必要的话回滚队列 停止,或者如果没有的话就开始一个新的线程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);//队列满了,执行拒绝策略 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法 } /** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。
ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。
ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。
ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
相当于newScheduledThreadPool(int corePoolSize)中corePoolSize设置为1。
ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();
延迟线程池
class MyScheduledTask implements Runnable { private String tname; public MyScheduledTask(String tname) { this.tname=tname; } public void run() { System.out.println(tname+"任务时延2秒执行!!!"); } }public class intsmaze { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool =Executors.newScheduledThreadPool(2); MyScheduledTask mt1=new MyScheduledTask("MT1"); scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS); } }
newWorkStealingPool java8新增连接池-intsmaze
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争 public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。
希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。
用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:
Ncpu=CPU的数量
Ucpu=目标CPU的使用率,0<=Ucpu<=1
W/C=等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的线程池的大小等于:
Nthreads=Ncpu*Ucpu*(1+W/C)
在java中,可以通过
Runtime.getRuntime().availableProcessors()
取得可以CPU数量。
相关推荐:
ThreadPoolExecutor线程池之submit方法
JAVA中ThreadPoolExecutor线程池的submit方法详解
위 내용은 Java 동시 스레드 풀: ThreadPoolExecutor에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!