Il existe deux manières de gérer dynamiquement les tâches planifiées :
Méthode 1 : Configuration du front-end Web Déclencheur (associé à Cron), la classe ThreadPoolTaskScheduler crée le mode Planificateur pour gérer dynamiquement les tâches de planification planifiées
Méthode 2 : Basée sur la gestion dynamique de la tâche de planification Schedule créée, c'est-à-dire déclarer la planification Schedule avec l'annotation @Scheduled de la classe du composant, et l'initialiser une fois avant de démarrer le programme, comme par exemple :
@Component public class TestTask { private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Scheduled(cron = "0/2 * * * * ?") public void robReceiveExpireTask() { System.out.println(df.format(LocalDateTime.now()) + "测试测试"); } }
Défaut : actuellement, ce n'est pas possible pour ajouter une planification et un arrêt, un démarrage, une réinitialisation et d'autres fonctions de gestion pendant l'exécution.
L'architecture est SpringBoot + Spring + mybatis-plus
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>merak-hyper-automation-boot</artifactId> <groupId>com.merak.automation</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>automation-quartz</artifactId> <packaging>jar</packaging> <repositories> <repository> <id>aliyun</id> <name>aliyun Repository</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Spring框架基本的核心工具 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <!-- SpringWeb模块 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- druid数据连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>jakarta.validation</groupId> <artifactId>jakarta.validation-api</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> <!--引入quartz定时框架--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> <version>2.2.5.RELEASE</version> </dependency> </dependencies> <build> <plugins> <!-- 打包跳过测试 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Fichier/application.yml dans. le répertoire des ressources :
spring:
profiles:
active: File/application-dev.yml dans le répertoire dev
resources:
server:
port: 12105
servlet:
context-path: /automation -quartzgestion:
points de terminaison:
web:
exposition:
include: '*'# Spring configuration
spring:
ressources:
emplacements statiques: classpath:/static/,classpath:/templates/
mvc :
throw- exception-if-no-handler-found : true
static-path-pattern : /**
application :
nom : automation-workflow
main:
Allow-bean-definition-overriding : true
# Fichier upload
servlet :
multipart :
#Taille de fichier unique
taille maximale du fichier : 2000 Mo
#Définir la taille totale du fichier téléchargé, taille maximale de la demande : 4000 Mo
#Conversion unifiée de l'horodatage Json
jackson :
format de date : aaaa -MM -dd HH:mm:ss
fuseau horaire : GMT+8
aop:
proxy-target-class : true
configuration automatique :
exclure : com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
source de données :
dynamique :
druide:
# Paramètres globaux du druide, la plupart des valeurs sont cohérentes avec la valeur par défaut. (Les paramètres actuellement pris en charge sont les suivants, veuillez ne pas les définir au hasard si vous ne connaissez pas leur signification) ‐ ‐ ‐ , .时间
maxWait : 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis : 60000
#单位是毫秒
minEvictableIdleTimeMillis : 300000
' s ' s ' s ' s ‐ ' s ' ‐ ‐ ‐ utiliser en utilisant ' ' s ' s ‐ ‐ ‐ testOnBorrow : false
‐ poolPreparedStatements : true
maxPoolPreparedStatementPerConnectionSize : 20
# Configurer les filtres pour la surveillance et l'interception des statistiques Après avoir supprimé l'interface de surveillance, SQL ne peut pas être compté
mot de passe : root
nom de classe de pilote : com.mysql.jdbc.Driver#mybatis plus 设置
mybatis-plus:
mapper-locations: classpath*:com/merak/hyper/automation/persist/**/xml/*Mapper.xml
global-config:
# 关闭MP3.0自带的banner
banner: false
db-config:
id-type: ID_WORKER_STR
# 默认数据库表下划线命名
table-underline: true
configuration:
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:
level:
com.merar.hyper: debug
com.merak.hyper.automation.persist.**.mapper: debug
org.springframework: warn
启动MerakQuartzApplication类
package com.merak.hyper.automation; import org.mybatis.spring.annotation.MapperScan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author chenjun * @version 1.0 * @ClassName: MerakQuartzApplication * @description: 工单任务调度 * @date 2022/9/22 10:30 */ @EnableScheduling @EnableAsync @MapperScan(basePackages = {"com.merak.hyper.automation.persist.**.mapper"}) @SpringBootApplication(scanBasePackages = {"com.merak.hyper.automation.**"}, exclude = {SecurityAutoConfiguration.class}) public class MerakQuartzApplication { public static final Logger log = LoggerFactory.getLogger(MerakQuartzApplication.class); public static void main(String[] args) { SpringApplication.run(MerakQuartzApplication.class, args); } private int taskSchedulerCorePoolSize = 15; private int awaitTerminationSeconds = 60; private String threadNamePrefix = "taskExecutor-"; /** * @description: 实例化ThreadPoolTaskScheduler对象,用于创建ScheduledFuture<?> scheduledFuture */ @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(taskSchedulerCorePoolSize); taskScheduler.setThreadNamePrefix(threadNamePrefix); taskScheduler.setWaitForTasksToCompleteOnShutdown(false); taskScheduler.setAwaitTerminationSeconds(awaitTerminationSeconds); /**需要实例化线程*/ taskScheduler.initialize(); // isinitialized = true; log.info("初始化ThreadPoolTaskScheduler ThreadNamePrefix=" + threadNamePrefix + ",PoolSize=" + taskSchedulerCorePoolSize + ",awaitTerminationSeconds=" + awaitTerminationSeconds); return taskScheduler; } /** * @description: 实例化ThreadPoolTaskExecutor对象,管理asyncTask启动的线程,应用类为 ScheduledHelper */ @Bean("asyncTaskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("asyncTaskExecutor-"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
一、启动时项目启动时,加载任务关联的触发器,并全量执行流程。
initLineRunner类:
package com.merak.hyper.automation.Scheduling; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import com.merak.hyper.automation.persist.entity.BusWorkflow; import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService; import com.merak.hyper.automation.persist.service.IBusWorkflowService; import com.merak.hyper.automation.util.CommonUtil; import com.merak.hyper.automation.util.ScheduleUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Iterator; import java.util.List; import java.util.Map; /** * 项目启动时,加载数字员工关联的触发器,并全量执行 * @Date: 2020/12/25:16:00 **/ @Component @Order(1) public class initLineRunner implements CommandLineRunner { public static final Logger log = LoggerFactory.getLogger(initLineRunner.class); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private TaskService taskService; @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private IBusWorkflowService workflowService; @Override public void run(String... args) { log.info("项目启动:加载数字员工关联的触发器信息并全量执行," + df.format(LocalDateTime.now())); QueryWrapper<BusWorkflow> wrapper = new QueryWrapper<>(); wrapper.eq("wf_type", "3");//3:云托管 wrapper.eq("wf_state", "1"); List<BusWorkflow> busWorkflows = workflowService.list(wrapper); List<AutoTriggerInfo> triggerInfos = triggerInfoService.list(); if( 0 == busWorkflows.size() || 0 == triggerInfos.size() ){ log.info("数字员工关联的触发器信息不正确,员工记录数:"+busWorkflows.size()+",触发器记录数:"+triggerInfos.size()); } else{ //数字员工关联的触发器信息 Map<String,AutoTriggerInfo> loadWfidAndTriggerInfo = CommonUtil.loadWfidAndTriggerInfo(busWorkflows,triggerInfos); Iterator<Map.Entry<String, AutoTriggerInfo>> entries = loadWfidAndTriggerInfo.entrySet().iterator(); while (entries.hasNext()) { Map.Entry<String, AutoTriggerInfo> entry = entries.next(); String wfId = entry.getKey(); BusWorkflow workflow = busWorkflows.stream().filter( t -> wfId.equals(t.getWfId()) ).findAny().orElse(null); if( null != workflow ){ ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue()); } } log.info("数字员工关联的触发器信息全量执行完成,数字员工定时个数:"+loadWfidAndTriggerInfo.size()+","+df.format(LocalDateTime.now())); } } } 核心代码: ```java ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue());
Scheduler管理工具类:启动、取消、修改等管理
package com.merak.hyper.automation.util; import com.merak.hyper.automation.Scheduling.ScheduleTask; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledFuture; /** * @version 1.0 * @ClassName: ScheduleUtil * @description: Scheduler管理工具类:启动、取消、修改等管理 */ public class ScheduleUtil { public static final Logger log = LoggerFactory.getLogger(ScheduleUtil.class); private static ThreadPoolTaskScheduler threadPoolTaskScheduler = SpringContextUtils.getBean(ThreadPoolTaskScheduler.class); //存储[数字员工wfI,dScheduledFuture]集合 private static Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>(); /** * 启动 * * @param scheduleTask 定时任务 * @param triggerInfo */ public static boolean start(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { String wfId = scheduleTask.getId(); log.info("启动数字员工"+wfId+"定时任务线程" + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(triggerInfo.getLogicConfig())); scheduledFutureMap.put(wfId, scheduledFuture); return true; } /** * 取消 * * @param scheduleTask 定时任务 */ public static boolean cancel(ScheduleTask scheduleTask) { log.info("关闭定时任务线程 taskId " + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(scheduleTask.getId()); if (scheduledFuture != null && !scheduledFuture.isCancelled()) { scheduledFuture.cancel(false); } scheduledFutureMap.remove(scheduleTask.getId()); return true; } /** * 修改 * * @param scheduleTask 定时任务 * @param triggerInfo */ public static boolean reset(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { //先取消定时任务 cancel(scheduleTask); //然后启动新的定时任务 start(scheduleTask, triggerInfo); return true; } }
ScheduleTask类:ScheduleTask任务类
package com.merak.hyper.automation.Scheduling; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @version 1.0 * @ClassName: ScheduleTask * @description: ScheduleTask,关联任务id、用户id和具体执行的TaskService类,实现Runnable类 */ public class ScheduleTask implements Runnable { private static final int TIMEOUT = 30000; private String id; private String userId; private TaskService service; public static final Logger log = LoggerFactory.getLogger(ScheduleTask.class); public String getId() { return id; } /** * @param id 任务ID * @param service 业务类 */ public ScheduleTask(String id, String userId, TaskService service) { this.id = id; this.userId = userId; this.service = service; } @Override public void run() { log.info("ScheduleTask-执行数字员工消息的发送,id:"+ this.id + ",用户id:"+userId); service.work(this.id,this.userId); } }
/** * @version 1.0 * @ClassName: TaskService * @description: TaskService */ public interface TaskService { /** * 业务处理方法 * @param keyword 关键参数 * @param userId */ void work(String keyword,String userId); } /** * @description: TaskService实现类,具体执行定时调度的业务 */ @Service public class TaskServiceImpl implements TaskService { public static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class); @Autowired private IAutoDeviceInfoService deviceInfoService; @Override public void work(String wfId,String userId) { try { log.info("定时任务:根据数字员工wfId"+ wfId +",用户id:"+userId+",发送消息..."); //sendRobotMsg(wfId,userId); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }
二、通过WEB配置的变更,动态管理定时任务
ScheduledController类:scheduled Web业务层:启动、取消、修改等管理schedule
调度任务信息变更(如1:Trigger Cron变更 2:任务停止 3:任务新增加等)
package com.merak.hyper.automation.controller; import com.merak.hyper.automation.common.core.domain.AjaxResult; import com.merak.hyper.automation.common.core.vo.ScheduledApiVo; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService; import com.merak.hyper.automation.util.ScheduledHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @version 1.0 * @ClassName: ScheduledController * @description: scheduled Web业务层:启动、取消、修改等管理schedule */ @RestController @RequestMapping("/api/scheduled") public class ScheduledController { public static final Logger log = LoggerFactory.getLogger(ScheduledController.class); @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private ScheduledHelper scheduledHelper; @PostMapping("/add") public AjaxResult addScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.addScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/reset") public AjaxResult resetScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.resetScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/stop") public AjaxResult stopScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.stopScheduleds(scheduledApiVo); return AjaxResult.success(); } } ScheduledHelper类:对外提供ScheduledHelper管理:创建、变更、停止 ```java package com.merak.hyper.automation.util; import com.merak.hyper.automation.Scheduling.ScheduleTask; import com.merak.hyper.automation.Scheduling.TaskService; import com.merak.hyper.automation.common.core.vo.ScheduledApiVo; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @version 1.0 * @ClassName: ScheduledHelper * @description:对外提供ScheduledHelper管理:创建、变更、停止 */ @Component public class ScheduledHelper { public static final Logger log = LoggerFactory.getLogger(ScheduledHelper.class); /** * @description: 对外(Web)提供异步的Scheduleds增加操作 */ @Async("asyncTaskExecutor") public void addScheduleds(ScheduledApiVo scheduledApiVo, AutoTriggerInfo triggerInfo) { //addSchedule任务 log.warn("创建原数字员工["+scheduledApiVo.getWfId()+"],同步启动Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.start(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void resetScheduleds(ScheduledApiVo scheduledApiVo,AutoTriggerInfo triggerInfo) { //cron值改变,变更Schedule任务 log.warn("数字员工["+scheduledApiVo.getWfId()+"]关联的触发器信息cron值改变,变更Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.reset(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void stopScheduleds(ScheduledApiVo scheduledApiVo) { //移除Wfid,停止原Schedule任务 log.warn("原数字员工["+scheduledApiVo.getWfId()+"]无效,同步停止Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.cancel(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService)); } }
SpringContextUtils类:
package com.merak.hyper.automation.util; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @version 1.0 * @ClassName: SpringContextUtils * @description: 加载Class对象 */ @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> requiredType) { return applicationContext.getBean(requiredType); } public static <T> T getBean(String name, Class<T> requiredType) { return applicationContext.getBean(name, requiredType); } public static boolean containsBean(String name) { return applicationContext.containsBean(name); } public static boolean isSingleton(String name) { return applicationContext.isSingleton(name); } public static Class<? extends Object> getType(String name) { return applicationContext.getType(name); } }
ScheduledApiVo类:
import java.io.Serializable; /** * @version 1.0 * @ClassName: ScheduledApiVo * @description: scheduled Web业务层Api传递参数Vo类 */ public class ScheduledApiVo implements Serializable { private String wfId; private String userId; private String triggerId; //set get 略 }
最终:Web端通过发送Http请求 ,调用ScheduledHelper管理类接口,实现Scheduled创建、变更、停止操作
log.info("3:云托管更新启动数字员工操作"); ScheduledApiVo scheduledApiVo = new ScheduledApiVo(); scheduledApiVo.setWfId(wfId); scheduledApiVo.setUserId(String.valueOf(updateUserId)); scheduledApiVo.setTriggerId(newTriggerInfo.getId()); String webHookBody = JSON.toJSONString(scheduledApiVo); EmsApiUtil.SendQuartzMessage(url, "add", webHookBody); ******************** 分隔 ************************ public static boolean SendQuartzMessage(String quartzUrl, String method, String webHookBody){ boolean result = false; try{ //org.apache.httpcomponents.httpclient sendPost,pom依赖如下dependency String resp = HttpClientUtil.sendPostByJson(quartzUrl+"/"+method, webHookBody,0); if( "error".equals(resp) || resp.contains("405 Not Allowed")){ log.error("调用任务调度中心消息发送失败,地址:"+quartzUrl); } else { JSONObject jsonObject = JSON.parseObject(resp); if( "200".equals(String.valueOf(jsonObject.get("code"))) ){ result = true; } else{ log.error("调用任务调度中心失败,msg:"+String.valueOf(jsonObject.get("msg"))); } } }catch (Exception e){ log.error("调用任务调度中心失败,msg:"+e.getMessage()); } return result; }
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency>
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!