Maison  >  Article  >  Java  >  Comment implémenter la fonction de tâche planifiée SpringBoot

Comment implémenter la fonction de tâche planifiée SpringBoot

WBOY
WBOYavant
2023-05-10 16:16:131334parcourir

Contexte

Le projet a besoin d'une fonction capable d'ajouter dynamiquement des tâches planifiées. Le projet utilise actuellement le système de planification de tâches planifiées xxl-job. Cependant, après une certaine compréhension de la fonction xxl-job, j'ai trouvé que xxl-job est très utile. . La prise en charge de l'ajout dynamique de tâches planifiées au projet et de la suppression dynamique de tâches planifiées n'est pas très bonne, vous devez donc implémenter manuellement la fonction d'une tâche planifiée par vous-même

Deuxième planification dynamique des tâches planifiées

1 Sélection technologique

Timer code> ou <code>ScheduledExecutorServiceTimer or ScheduledExecutorService

这两个都能实现定时任务调度,先看下Timer的定时任务调度

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的实现

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);

两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么Timer出现问题会终止执行

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}

我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

2 使用ScheduledThreadPoolExecutor

从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}

TaskScheduler

Les deux peuvent implémenter la planification des tâches planifiées. Examinons d'abord la planification des tâches planifiées de Timer

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

Ensuite, examinons l'implémentation de ScheduledThreadPoolExecutor

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}
.

Les deux peuvent implémenter des tâches planifiées, qu'en est-il de leurs différences ? L'utilisation d'Alibaba p3c donnera des suggestions et des différences

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}

Du point de vue de la suggestion, vous devez choisir ScheduledExecutorService. pour voir pourquoi TimerL'exécution sera terminée si un problème survient<h3><pre class="brush:java;">public void executeTask(Long taskId) { if (!redisService.setIfAbsent(String.valueOf(taskId),&quot;1&quot;,2L, TimeUnit.SECONDS)) { log.info(&quot;已有执行中定时发送短信任务,本次不执行!&quot;); return; }</pre></h3>Lors de la création d'un nouvel objet, on voit qu'un thread est démarré, alors que fait ce thread ? Jetons un coup d'oeil<p>rrreee</p>Nous voyons que <code>mainLoop() est exécuté, et à l'intérieur se trouve la méthode while (true) qui boucle à l'infini pour obtenir l'heure et l'heure actuelle dans l'objet tâche du programme Si l'heure est comparée, elle sera exécutée si elle est la même. Cependant, une fois qu'une erreur est signalée, elle entrera définitivement et effacera toutes les informations de la tâche.

À l'heure actuelle, nous avons trouvé la réponse. Une fois le minuteur instancié, il démarre un thread et exécute des tâches dans une boucle ininterrompue. Une fois qu'une erreur est signalée, le thread est terminé, il le fera donc. ne sera pas exécuté. Pour les tâches suivantes, ScheduledThreadPoolExecutor est exécuté par plusieurs threads. Même si l'une des tâches signale une erreur, cela n'affectera pas l'exécution des autres threads.

🎜2 Utiliser ScheduledThreadPoolExecutor🎜🎜🎜De ce qui précède, utiliser ScheduledThreadPoolExecutor est relativement simple, mais nous voulons l'implémenter de manière plus élégante, nous choisissons donc TaskScheduler pour l'implémenter🎜 rrreee 🎜TaskScheduler est la classe principale pour cette implémentation de fonction, mais c'est une interface 🎜rrreee🎜Comme vous pouvez le voir dans le code précédent, nous avons injecté cette classe dans la classe, mais c'est une interface. Comment savoir de quelle classe d'implémentation il s'agit ? Dans le passé, lorsque cela se produisait, je devais ajouter @Primany ou @Quality à la classe pour exécuter la classe d'implémentation, mais nous avons vu qu'il n'y avait aucune marque sur mon injection car c'était le cas. implémenté d'une autre manière🎜rrreee 🎜Bean TaskScheduler est enregistré lors de l'initialisation de Spring, et nous pouvons voir que son implémentation est ThreadPoolTaskScheduler. Dans les informations en ligne, certaines personnes disent que ThreadPoolTaskScheduler est la classe d'implémentation par défaut de TaskScheduler. Il reste encore à le spécifier, et de cette façon, lorsque l'on veut remplacer l'implémentation, il suffit de modifier la classe de configuration, qui est très flexible. 🎜🎜Pourquoi est-il considéré comme une implémentation plus élégante ? Parce que son noyau est également implémenté via ScheduledThreadPoolExecutor🎜rrreee🎜Trois problèmes d'exécution de tâches multi-nœuds🎜🎜Dans ce processus d'implémentation, je n'ai pas choisi xxl-job Au lieu de l'implémenter, TaskScheduler est utilisé. Cela crée également un problème. xxl-job est un système de planification de programmes distribués. Lorsqu'une application qui souhaite effectuer des tâches planifiées utilise xxl-job, quel que soit le nombre de tâches déployées dans le nœud d'application, xxl-job le fera uniquement. sélectionnez l'un des nœuds comme nœud pour l'exécution des tâches planifiées, afin que les tâches planifiées ne soient pas exécutées simultanément sur différents nœuds, ce qui entraînerait des problèmes d'exécution répétés. Lorsque vous utilisez TaskScheduler pour l'implémenter, vous devez considérer le problème de l'exécution répétée de plusieurs nœuds. . Bien sûr, puisqu'il y a un problème, il existe une solution🎜🎜· La première option consiste à séparer la fonction de tâche planifiée et à la déployer séparément, et à ne déployer qu'un seul nœud. La deuxième option consiste à utiliser redis setNx pour garantir que seul une tâche est exécutée en même temps🎜🎜moi, j'ai choisi la deuxième option à mettre en œuvre. Bien sûr, il existe également des moyens de garantir qu'elle ne se répète pas, je n'entrerai pas dans les détails ici.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer