>  기사  >  Java  >  SpringBoot 예약 작업 기능 구현 방법

SpringBoot 예약 작업 기능 구현 방법

WBOY
WBOY앞으로
2023-05-10 16:16:131371검색

Background

프로젝트에는 예약된 작업을 동적으로 추가할 수 있는 기능이 필요합니다. 프로젝트는 현재 xxl-job 예약된 작업 예약 시스템을 사용하고 있지만 xxl-job 기능을 어느 정도 이해한 후에 xxl-job이 매우 유용하다는 것을 알았습니다. . 프로젝트에 예약된 작업을 동적으로 추가하고 예약된 작업을 동적으로 삭제하는 기능이 그다지 좋지 않아서 예약된 작업의 기능을 직접 구현해야 합니다

두 번째 동적 예약된 작업 스케줄링

1 기술 선택

Timer code> 또는 <code>ScheduledExecutorServiceTimer or ScheduledExecutorService

这两个都能实现定时任务调度,先看下Timer的定时任务调度

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的实现

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);

两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么Timer出现问题会终止执行

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}

我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

2 使用ScheduledThreadPoolExecutor

从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}

TaskScheduler

둘 다 예약된 작업 예약을 구현할 수 있습니다. 먼저 Timer의 예약된 작업 예약을 살펴보겠습니다.

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

그런 다음 ScheduledThreadPoolExecutor

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}
의 구현을 살펴보겠습니다.

둘 다 예약된 작업을 구현할 수 있습니다. 차이점은 무엇입니까? Alibaba p3c를 사용하면 제안과 차이점이 제공됩니다.

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}

제안 관점에서 ScheduledExecutorService를 선택해야 합니다. 왜 타이머문제가 발생하면 실행이 종료되는지 알아보기

public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有执行中定时发送短信任务,本次不执行!");
        return;
    }

새 개체를 생성하면 스레드가 시작되는 것을 볼 수 있는데 이 스레드는 무엇을 하고 있는 걸까요? 살펴보겠습니다

rrreee

mainLoop()가 실행되고 그 안에는 시간과 현재 시간을 얻기 위해 무한 반복하는 while(true) 메소드가 있는 것을 볼 수 있습니다 프로그램 내 태스크 객체에 시간을 비교하여 동일하면 실행되지만, 오류가 보고되면 최종적으로 태스크 정보를 모두 입력하고 삭제합니다.

이제 답을 찾았습니다. 타이머가 인스턴스화되면 중단 없는 루프 매칭으로 작업을 수행합니다. 오류가 보고되면 스레드가 종료됩니다. 후속 작업의 경우 ScheduledThreadPoolExecutor는 여러 스레드에 의해 실행됩니다. 작업 중 하나가 오류를 보고하더라도 다른 스레드의 실행에는 영향을 미치지 않습니다.

🎜2 ScheduledThreadPoolExecutor 사용🎜🎜🎜위에서 ScheduledThreadPoolExecutor를 사용하는 것은 비교적 간단하지만, 좀 더 우아하게 구현하고 싶어서 TaskScheduler를 선택하여 구현합니다🎜 rrreee 🎜TaskScheduler는 이 함수 구현의 핵심 클래스이지만 인터페이스입니다. 🎜rrreee🎜 이전 코드에서 볼 수 있듯이 이 클래스를 클래스에 주입했지만 인터페이스입니다. 예전에는 어떤 구현 클래스인지 어떻게 알 수 있나요? 예전에는 이런 일이 발생했을 때 구현 클래스를 실행하기 위해 클래스에 @Primany나 @Quality를 추가해야 했는데, 주입에 표시가 없는 걸 봤습니다. 다른 방식으로 구현됨🎜rrreee 🎜Bean TaskScheduler는 Spring이 초기화될 때 등록되며, 그 구현이 ThreadPoolTaskScheduler임을 알 수 있습니다. 온라인 정보에서는 ThreadPoolTaskScheduler가 TaskScheduler의 기본 구현 클래스라고 말하는 사람들도 있습니다. 우리는 여전히 이를 지정해야 하며, 이러한 방식으로 구현을 교체하려면 구성 클래스만 수정하면 되며 이는 매우 유연합니다. 🎜🎜왜 더 우아한 구현이라고 할까요? 그 핵심도 ScheduledThreadPoolExecutor를 통해 구현되기 때문입니다🎜rrreee🎜세 가지 다중 노드 작업 실행 문제🎜🎜이 구현 과정에서는 xxl-job을 선택하지 않고 구현했습니다. TaskScheduler를 사용하여 이를 구현하는 경우에도 xxl-job은 분산 프로그램 스케줄링 시스템입니다. job은 예약된 작업 실행을 위한 노드로 노드 중 하나만 선택하므로 예약된 작업이 다른 노드에서 동시에 실행되지 않으므로 TaskScheduler를 사용하여 구현할 때 반복되는 문제를 고려해야 합니다. 다중 노드 실행. 물론 문제가 있으니 해결책도 있습니다🎜🎜· 첫 번째 옵션은 예약된 작업 기능을 분리하여 별도로 배포하고, 두 번째 옵션은 redis setNx를 사용하여 하나의 작업만 보장하는 것입니다. 구현하기 위해 두 번째 옵션을 선택했습니다. 물론 반복되지 않도록 하는 몇 가지 방법도 있습니다. 여기서는 구현에 대해 자세히 설명하지 않겠습니다.

위 내용은 SpringBoot 예약 작업 기능 구현 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제