Heim >Java >javaLernprogramm >So verwenden Sie den SpringBoot-Thread-Pool und den Java-Thread-Pool
public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
Sie müssen @EnableAsync
hinzufügen die Startklasse >Annotation, andernfalls wird sie nicht wirksam. @EnableAsync
注解,否则不会生效。
@SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
此时可不加 @EnableAsync
注解
@SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
SpringBoot线程池的常见配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
TaskExecutionAutoConfiguration
类中定义了 ThreadPoolTaskExecutor
,该类的内部实现也是基于java原生的 ThreadPoolExecutor
类。initializeExecutor()
方法在其父类中被调用,但是在父类中 RejectedExecutionHandler
被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
,并通过initialize()
方法将AbortPolicy
传入initializeExecutor()
中。
注意在TaskExecutionAutoConfiguration
类中,ThreadPoolTaskExecutor
类的bean的名称为: applicationTaskExecutor
和 taskExecutor
。
// TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
// ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
// ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
覆盖默认的 taskExecutor
对象,bean的返回类型可以是ThreadPoolTaskExecutor
也可以是Executor
。
@Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
如果出现了多个线程池,例如再定义一个线程池 taskExecutor2
,则直接执行会报错。此时需要指定bean的名称即可。
@Bean("taskExecutor2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
对于使用@Async
注解的多线程则在注解中指定bean的名字即可。
@Async("taskExecutor2") public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); }
线程池的四种拒绝策略
ThreadPoolExecutor
类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE
),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
类似于newCachedThreadPool
,线程数无上限,但是可以指定corePoolSize
。可实现延迟执行、周期执行。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
延时执行:
scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
CallerRunsPolicy
:线程池让调用者去执行。
AbortPolicy
:如果线程池拒绝了任务,直接报错。
DiscardPolicy
:如果线程池拒绝了任务,直接丢弃。
DiscardOldestPolicy
:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
直接在主线程中执行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
效果类似于:
Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
直接抛出RejectedExecutionException
异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
e.getQueue().poll()
: 取出队列最旧的任务。
e.execute(r)
: 当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
java
的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker
对象,该对象在 被维护在private final HashSet7b8f3a9cc722774601ac1c36ccacd929 workers = new HashSet7b8f3a9cc722774601ac1c36ccacd929();
。workQueue
是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue
队列中。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
work对象的执行依赖于 runWorker()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }Methode 2: ThreadPoolTaskExecutor direkt einfügen🎜🎜Sie können die Annotation
@EnableAsync
derzeit nicht hinzufügen🎜rrreee🎜Thread-Pool-Standardkonfigurationsinformationen🎜🎜Gemeinsame Konfiguration des SpringBoot-Thread-Pools:🎜rrreee🎜Implementierung des SpringBoot-Thread-Pools Prinzip 🎜🎜TaskExecutionAutoConfiguration
-Klasse definiert ThreadPoolTaskExecutor
, und die interne Implementierung dieser Klasse basiert auch auf Javas nativer ThreadPoolExecutor
-Klasse. Die initializeExecutor()
-Methode wird in ihrer übergeordneten Klasse aufgerufen, aber in der übergeordneten Klasse ist RejectedExecutionHandler
als private RejectedExecutionHandler failedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); und übergeben Sie <code>AbortPolicy
über die Methode initialize()
an initializeExecutor()
. 🎜🎜Beachten Sie, dass in der Klasse TaskExecutionAutoConfiguration
die Bean-Namen der Klasse ThreadPoolTaskExecutor
lauten: applicationTaskExecutor
und taskExecutor
. 🎜rrreeerrreeerrreee🎜Überschreiben Sie den Standard-Thread-Pool🎜🎜Überschreiben Sie das Standardobjekt taskExecutor
. Der Rückgabetyp der Bean kann ThreadPoolTaskExecutor
oder Executor
sein. 🎜rrreee🎜Mehrere Thread-Pools verwalten🎜🎜Wenn mehrere Thread-Pools vorhanden sind und beispielsweise ein anderer Thread-Pool taskExecutor2
definiert wird, wird bei der direkten Ausführung ein Fehler gemeldet. Zu diesem Zeitpunkt müssen Sie den Namen der Bean angeben. 🎜rrreee🎜Wenn Sie auf den Thread-Pool verweisen, müssen Sie den Variablennamen in den Namen der Bean ändern, damit diese nach Namen durchsucht wird. 🎜rrreee🎜Für Multithreading mit der Annotation @Async
geben Sie einfach den Bean-Namen in der Annotation an. 🎜rrreee🎜Vier Ablehnungsstrategien von Thread-Pools🎜🎜Vier häufig verwendete Thread-Pools in JAVA🎜🎜ThreadPoolExecutor
Der Konstruktor der Klasse lautet wie folgt: 🎜rrreee🎜newCachedThreadPool🎜🎜Keine Begrenzung der maximalen Anzahl von Threads ( MaximumPoolSize=Integer.MAX_VALUE
), wenn es mehr inaktive Threads als nötig gibt, werden diese recycelt, andernfalls werden die vorhandenen Threads wiederverwendet. 🎜rrreee🎜newFixedThreadPool🎜🎜Thread-Pool mit fester Länge, Aufgaben, die die Anzahl der Threads überschreiten, warten in der Warteschlange. 🎜rrreee🎜newScheduledThreadPool🎜🎜Ähnlich wie bei newCachedThreadPool
gibt es keine Obergrenze für die Anzahl der Threads, aber corePoolSize
kann angegeben werden. Es können eine verzögerte Ausführung und eine periodische Ausführung erreicht werden. 🎜rrreee🎜Periodische Ausführung: 🎜rrreee🎜Verzögerte Ausführung: 🎜rrreee🎜newSingleThreadExecutor🎜🎜Single-Threaded-Thread-Pool, der die sequentielle Ausführung von Threads realisieren kann. 🎜rrreee🎜Vier Ablehnungsstrategien im Java-Thread-Pool🎜CallerRunsPolicy
: Der Thread-Pool lässt den Aufrufer ausführen. 🎜AbortPolicy
: Wenn der Thread-Pool die Aufgabe ablehnt, wird direkt ein Fehler gemeldet. 🎜DiscardPolicy
: Wenn der Thread-Pool die Aufgabe ablehnt, wird sie direkt verworfen. 🎜DiscardOldestPolicy
: Wenn der Thread-Pool eine Aufgabe ablehnt, wird die älteste, nicht ausgeführte Aufgabe im Thread-Pool direkt verworfen und die neue Aufgabe in die Warteschlange gestellt. 🎜RejectedExecutionException
aus und gibt die Aufgabeninformationen und Thread-Pool-Informationen an. , 🎜rrreee🎜DiscardPolicy🎜🎜 bewirkt nichts. 🎜rrreee🎜DiscardOldestPolicy🎜e.getQueue().poll()
: Entferne die älteste aus die Warteschlangenaufgabe. 🎜e.execute(r)
: Die aktuelle Aufgabe wird zur Warteschlange hinzugefügt. 🎜java
speichert das Objekt java.util.concurrent.ThreadPoolExecutor.Worker
wird in private final HashSet7b8f3a9cc722774601ac1c36ccacd929 Workers = new HashSet7b8f3a9cc722774601ac1c36ccacd929();
verwaltet. workQueue
ist eine Warteschlange, die auszuführende Aufgaben speichert. Wenn dem Thread-Pool eine neue Aufgabe hinzugefügt wird, wird die Aufgabe zur workQueue
-Warteschlange hinzugefügt. 🎜rrreee🎜Die Ausführung des Arbeitsobjekts hängt von runWorker()
ab. Anders als die Threads, die wir normalerweise schreiben, befindet sich dieser Thread in einer Schleife und ruft kontinuierlich neue Aufgaben aus der Warteschlange zur Ausführung ab. Daher können Threads im Thread-Pool wiederverwendet werden, anstatt wie die Threads, die wir normalerweise verwenden, nach der Ausführung zu enden. 🎜rrreeeDas obige ist der detaillierte Inhalt vonSo verwenden Sie den SpringBoot-Thread-Pool und den Java-Thread-Pool. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!