Heim  >  Artikel  >  Java  >  So implementieren Sie die geplante Aufgabenfunktion von SpringBoot

So implementieren Sie die geplante Aufgabenfunktion von SpringBoot

WBOY
WBOYnach vorne
2023-05-10 16:16:131333Durchsuche

一Hintergrund

Das Projekt benötigt eine Funktion, die geplante Aufgaben dynamisch hinzufügen kann. Derzeit verwendet das Projekt das xxl-job-Planungssystem für geplante Aufgaben, aber nach einigen Tests von xxl-job Nach dem Verständnis Bei der Funktion habe ich festgestellt, dass die Unterstützung von xxl-job für das dynamische Hinzufügen geplanter Aufgaben und das dynamische Löschen geplanter Aufgaben zum Projekt nicht so gut ist, daher muss ich die Funktion einer geplanten Aufgabe manuell implementieren

2. Dynamische geplante Aufgabenplanung

1 Technologieauswahl

Timer oder ScheduledExecutorService#🎜 🎜##🎜 🎜#Beide können die geplante Aufgabenplanung implementieren. Schauen wir uns zunächst die geplante Aufgabenplanung von Timer an #Beide können das Timing implementieren. Was ist mit ihren Unterschieden? Wenn Sie Alibaba p3c verwenden, erhalten Sie Vorschläge und Unterschiede. Aus der Sicht der Vorschläge müssen Sie ScheduledExecutorService wählen Schauen Sie sich den Quellcode an. Sehen Sie, warum Timer die Ausführung beendet, wenn ein Problem auftritt. Timer or ScheduledExecutorService

这两个都能实现定时任务调度,先看下Timer的定时任务调度

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的实现

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);

两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么Timer出现问题会终止执行

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}

我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

2 使用ScheduledThreadPoolExecutor

从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}

TaskScheduler

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

Wenn wir ein neues Objekt erstellen, sehen wir, dass ein Thread gestartet wird. Also, was macht dieser Thread? Werfen wir einen Blick darauf

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}

Wir sehen, dass mainLoop() ausgeführt wird, und darin befindet sich die Methode while (true), die eine Endlosschleife durchführt Erhalten Sie die Aufgaben im Programm. Wenn die Zeit im Objekt mit der aktuellen Zeit verglichen wird, wird es ausgeführt, wenn sie gleich sind. Sobald jedoch ein Fehler gemeldet wird, werden alle Aufgabeninformationen endgültig eingegeben und gelöscht.

Zu diesem Zeitpunkt haben wir die Antwort gefunden. Nachdem der Timer instanziiert wurde, führt er Aufgaben in einer ununterbrochenen Schleife aus. Sobald ein Fehler gemeldet wird wird beendet, sodass nachfolgende Aufgaben nicht ausgeführt werden und ScheduledThreadPoolExecutor von mehreren Threads ausgeführt wird. Selbst wenn eine der Aufgaben einen Fehler meldet, hat dies keine Auswirkungen auf die Ausführung anderer Threads.

2 ScheduledThreadPoolExecutor verwenden

Aus dem Obigen geht hervor, dass die Verwendung von ScheduledThreadPoolExecutor relativ einfach ist, aber was wir erreichen müssen Es ist eleganter, daher habe ich TaskScheduler zur Implementierung ausgewählt

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}

TaskScheduler ist die Kernklasse dieser Funktionsimplementierung, aber es ist eine Schnittstelle# 🎜🎜 #
public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有执行中定时发送短信任务,本次不执行!");
        return;
    }

Sie können dem vorherigen Code entnehmen, dass wir diese Klasse in die Klasse eingefügt haben, aber woher wissen wir, um welche Implementierungsklasse es sich in der Vergangenheit handelte? Ich möchte der Klasse @Primany oder @ hinzufügen, um die implementierte Klasse auszuführen, aber wir sehen, dass auf meiner Injektion keine Markierung vorhanden ist, da sie auf andere Weise implementiert ist

rrreee#🎜🎜#Der Bean TaskScheduler ist registriert Während der Frühjahrsinitialisierung können wir sehen, dass die Implementierung ThreadPoolTaskScheduler die Standardimplementierungsklasse von TaskScheduler ist. Wenn wir die Implementierung ersetzen möchten, müssen wir nur die Konfigurationsklasse ändern. Ja, sehr flexibel. #🎜🎜##🎜🎜#Warum soll es sich um eine elegantere Implementierung handeln, weil ihr Kern auch über ScheduledThreadPoolExecutor implementiert wird? #In diesem Implementierungsprozess habe ich nicht xxl-job für die Implementierung ausgewählt, sondern TaskScheduler verwendet. Dies verursachte auch ein Problem, wenn eine Anwendung geplante Aufgaben ausführt xxl-job: Unabhängig davon, wie viele Knoten in der Anwendung bereitgestellt werden, wählt xxl-job nur einen der Knoten als Knoten für die Ausführung geplanter Aufgaben aus, sodass geplante Aufgaben nicht gleichzeitig auf verschiedenen Knoten ausgeführt werden, was zu Wiederholungen führt Wenn wir TaskScheduler verwenden, um es zu implementieren, müssen wir das Problem der wiederholten Ausführung mehrerer Knoten berücksichtigen. Da es ein Problem gibt, gibt es natürlich eine Lösung #🎜🎜##🎜🎜#· Die erste Option besteht darin, die geplante Aufgabenfunktion zu trennen und sie separat bereitzustellen und nur einen Knoten bereitzustellen setNx-Formular, um sicherzustellen, dass nur ein Knoten gleichzeitig ausgeführt wird. #🎜🎜##🎜🎜# Ich habe die zweite Option gewählt, um sie auszuführen. Natürlich gibt es einige Möglichkeiten, um sicherzustellen, dass sie nicht ausgeführt wird Ich werde hier nicht auf Details eingehen. Hier ist meine Implementierung #🎜🎜#rrreee

Das obige ist der detaillierte Inhalt vonSo implementieren Sie die geplante Aufgabenfunktion von SpringBoot. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen