Home >Java >javaTutorial >How to implement SpringBoot scheduled task function

How to implement SpringBoot scheduled task function

WBOY
WBOYforward
2023-05-10 16:16:131410browse

1 Background

The project needs a function that can dynamically add scheduled tasks. The project currently uses the xxl-job scheduled task scheduling system, but after some understanding of the xxl-job function , I found that xxl-job’s support for dynamically adding scheduled tasks and dynamically deleting scheduled tasks to the project is not that good, so I need to manually implement the function of a scheduled task

二dynamic scheduled task scheduling

1 Technology Selection

Timer or ScheduledExecutorService

Both of these can implement scheduled task scheduling, let’s take a look first Timer's scheduled task scheduling

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);

Let's take a look at the implementation of ScheduledThreadPoolExecutor

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);

Both can implement scheduled tasks, so what are their differences? Using Alibaba p3c will give suggestions and differences

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

From the suggestion point of view, you must choose ScheduledExecutorService. Let’s take a look at the source code to see why Timer will terminate execution if there is a problem.

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

New Object, we see that a thread is started, so what is this thread doing? Take a look at

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}

We see that mainLoop() is executed, and inside it is the while (true) method that loops infinitely to obtain the time in the task object in the program Compare it with the current time, if it is the same, it will be executed. However, once an error is reported, it will enter finally and clear all task information.

At this time we have found the answer. After the timer is instantiated, it starts a thread and performs tasks in an uninterrupted loop matching. It is single-threaded. Once an error is reported, the thread is terminated. Therefore, subsequent tasks will not be executed, and ScheduledThreadPoolExecutor is executed by multiple threads. Even if one of the tasks reports an error, it will not affect the execution of other threads.

2 Using ScheduledThreadPoolExecutor

From the above, using ScheduledThreadPoolExecutor is relatively simple, but we want to achieve something more elegant, so chooseTaskScheduler To implement

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}

TaskScheduler is the core class for this function implementation, but it is an interface

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

As you can see from the previous code, We injected this class into the class, but it is an interface. How do we know which implementation class it is? In the past, when this happened, we had to add @Primany or @Quality to the class to execute the implemented class, but we saw that my There is no mark on the injection because it is implemented in another way

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}

The Bean TaskScheduler is registered during spring initialization, and we can see that its implementation is ThreadPoolTaskScheduler. Some people say in online information ThreadPoolTaskScheduler is the default implementation class of TaskScheduler. In fact, it is not. We still need to specify it. In this way, when we want to replace the implementation, we only need to modify the configuration class, which is very flexible.

Why is it said to be a more elegant implementation, because its core is also implemented through ScheduledThreadPoolExecutor

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}

Three multi-node task execution issues

This time During the implementation process, I did not choose xxl-job for implementation, but used TaskScheduler for implementation. This also caused a problem. xxl-job is a distributed program scheduling system, which is used when applications want to execute scheduled tasks. When xxl-job is used, no matter how many nodes are deployed in the application, xxl-job will only select one of the nodes as the node for scheduled task execution, so that scheduled tasks will not be executed simultaneously on different nodes, causing repeated execution problems. Instead, use To implement TaskScheduler, we must consider the issue of repeated execution on multiple nodes. Of course, since there is a problem, there is a solution

· The first option is to separate the scheduled task function and deploy it separately, and only deploy one node. The second option uses redis setNx to ensure that only one task is running at the same time. Execution

I chose the second option to execute. Of course, there are some ways to ensure that it is not executed repeatedly. I won’t go into details here. Here is my implementation

public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有执行中定时发送短信任务,本次不执行!");
        return;
    }

The above is the detailed content of How to implement SpringBoot scheduled task function. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete