The project needs a function that can dynamically add scheduled tasks. The project currently uses the xxl-job scheduled task scheduling system, but after some understanding of the xxl-job function , I found that xxl-job’s support for dynamically adding scheduled tasks and dynamically deleting scheduled tasks to the project is not that good, so I need to manually implement the function of a scheduled task
1 Technology Selection
Timer
or ScheduledExecutorService
Both of these can implement scheduled task scheduling, let’s take a look first Timer's scheduled task scheduling
public class MyTimerTask extends TimerTask { private String name; public MyTimerTask(String name){ this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void run() { //task Calendar instance = Calendar.getInstance(); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime())); } } Timer timer = new Timer(); MyTimerTask timerTask = new MyTimerTask("NO.1"); //首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次 timer.schedule(timerTask,1000L,2000L);
Let's take a look at the implementation of ScheduledThreadPoolExecutor
//org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { //do something } },initialDelay,period, TimeUnit.HOURS);
Both can implement scheduled tasks, so what are their differences? Using Alibaba p3c will give suggestions and differences
多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
From the suggestion point of view, you must choose ScheduledExecutorService
. Let’s take a look at the source code to see why Timer
will terminate execution if there is a problem.
/** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); public Timer() { this("Timer-" + serialNumber()); } public Timer(String name) { thread.setName(name); thread.start(); }
New Object, we see that a thread is started, so what is this thread doing? Take a look at
class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; /** * 每一件一个任务都是一个quene */ private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // 清除所有任务信息 } } } /** * The main timer loop. (See class comment.) */ private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } }
We see that mainLoop()
is executed, and inside it is the while (true)
method that loops infinitely to obtain the time in the task object in the program Compare it with the current time, if it is the same, it will be executed. However, once an error is reported, it will enter finally and clear all task information.
At this time we have found the answer. After the timer is instantiated, it starts a thread and performs tasks in an uninterrupted loop matching. It is single-threaded. Once an error is reported, the thread is terminated. Therefore, subsequent tasks will not be executed, and ScheduledThreadPoolExecutor is executed by multiple threads. Even if one of the tasks reports an error, it will not affect the execution of other threads.
2 Using ScheduledThreadPoolExecutor
From the above, using ScheduledThreadPoolExecutor
is relatively simple, but we want to achieve something more elegant, so chooseTaskScheduler
To implement
@Component public class CronTaskRegistrar implements DisposableBean { private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); @Autowired private TaskScheduler taskScheduler; public TaskScheduler getScheduler() { return this.taskScheduler; } public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } private void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { Set<Runnable> runnables = this.scheduledTasks.keySet(); Iterator it1 = runnables.iterator(); while (it1.hasNext()) { SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next(); Long taskId = schedulingRunnable.getTaskId(); SchedulingRunnable cancelRunnable = (SchedulingRunnable) task; if (taskId.equals(cancelRunnable.getTaskId())) { ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable); if (scheduledTask != null){ scheduledTask.cancel(); } } } } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() throws Exception { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } }
TaskScheduler
is the core class for this function implementation, but it is an interface
public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
As you can see from the previous code, We injected this class into the class, but it is an interface. How do we know which implementation class it is? In the past, when this happened, we had to add @Primany or @Quality to the class to execute the implemented class, but we saw that my There is no mark on the injection because it is implemented in another way
@Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定时任务执行线程池核心线程数 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
The Bean TaskScheduler is registered during spring initialization, and we can see that its implementation is ThreadPoolTaskScheduler. Some people say in online information ThreadPoolTaskScheduler is the default implementation class of TaskScheduler. In fact, it is not. We still need to specify it. In this way, when we want to replace the implementation, we only need to modify the configuration class, which is very flexible.
Why is it said to be a more elegant implementation, because its core is also implemented through ScheduledThreadPoolExecutor
public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); return this.scheduledExecutor; }
This time During the implementation process, I did not choose xxl-job for implementation, but used TaskScheduler for implementation. This also caused a problem. xxl-job is a distributed program scheduling system, which is used when applications want to execute scheduled tasks. When xxl-job is used, no matter how many nodes are deployed in the application, xxl-job will only select one of the nodes as the node for scheduled task execution, so that scheduled tasks will not be executed simultaneously on different nodes, causing repeated execution problems. Instead, use To implement TaskScheduler, we must consider the issue of repeated execution on multiple nodes. Of course, since there is a problem, there is a solution
· The first option is to separate the scheduled task function and deploy it separately, and only deploy one node. The second option uses redis setNx to ensure that only one task is running at the same time. Execution
I chose the second option to execute. Of course, there are some ways to ensure that it is not executed repeatedly. I won’t go into details here. Here is my implementation
public void executeTask(Long taskId) { if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) { log.info("已有执行中定时发送短信任务,本次不执行!"); return; }
The above is the detailed content of How to implement SpringBoot scheduled task function. For more information, please follow other related articles on the PHP Chinese website!