在现有的产品结构中,已有调度平台,其中大多调度用于系统内部数据抽取,实现一个定期同步数据的功能。而调度平台仅仅只有同步数据是远远不够的,还需要一种定期执行某项业务的功能。基于这种需求,在原有的基础上,对调度平台进行了扩展。
在这个产品结构中,承载定时调度的服务是接口平台,该服务启动时会初始化调度容器。在原有的流程中,仅获取系统的定时任务列表,将其添加至调度中。扩展后,同步查询公共类型的定时调度任务。在公共的处理类中,通过fegin调用基础服务中的接口。然后在基础服务中,根据不同的业务调用不同的实现类。而每一个实现类可能调用其他的业务服务。
调度基于quartz实现,公共处理类中使用了简单工厂模式+策略模式
初始化容器
@Configuration public class ScheduleTriggerManage { private static final SchedulerFactory schedulerFactory = new org.quartz.impl.StdSchedulerFactory(); /** * 容器初始化执行 * * @param */ public void initContentStart() throws Exception { // 获取scheduler Scheduler scheduler = schedulerFactory.getScheduler(); scheduler.clear(); CommonTimerTaskQuery query = new CommonTimerTaskQuery(); query.setTaskStatus(1); // 查询启用状态的定时任务 R<List<CommonTimerTask>> listR = commonTimerTaskFeignApi.queryTimerTaskByParam(query); if (listR == null && !listR.getIsSuccess()) { throw new BizException("获取公共定时任务接口异常"); } // 获取启用的定时任务数据 List<CommonTimerTask> commonTimerTasks = listR.getData(); if (CollectionUtils.isNotEmpty(commonTimerTasks)) { for (CommonTimerTask timerTask : commonTimerTasks) { // 获取表达式 String timerCrons = timerTask.getTimerCrons(); // 表达式转集合 List<String> crons = new ArrayList<String>(Arrays.asList(timerCrons.split("\\|"))); // 遍历表达式 for (String cron : crons) { try { addCommonTimerTask(timerTask, cron); } catch (Exception e) { logger.error("添加公共定时调度异常,继续执行"+e.getMessage(),e); } } } } } // 添加公共定时任务调度 public void addCommonTimerTask(CommonTimerTask timerTask, String cron) throws Exception { // 获取taskUUID值 任务key+'&'+表达式 String taskUUID = timerTask.getTaskKey() + "&" + cron; // 获取scheduler Scheduler scheduler = schedulerFactory.getScheduler(); // 添加触发器监听 TriggerListener triggerListener = new MonitorTriggerListener(); scheduler.getListenerManager().addTriggerListener(triggerListener); // 添加job监听 JobListener jobListener = new MonitorJobListener(); scheduler.getListenerManager().addJobListener(jobListener); // 创建jobDetail对象,指定taskKey 和 任务执行类 JobKey jobKey = new JobKey(taskUUID); JobDetail jobDetail = JobBuilder.newJob(CommonTimerTaskScheduleExec.class) .withIdentity(jobKey) .usingJobData("taskKey", timerTask.getTaskKey()) .withDescription(timerTask.getTaskName()) .build(); // 创建触发器对象 TriggerKey triggerKey = new TriggerKey(taskUUID); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(triggerKey) .withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()) .build(); // 装配定时任务 scheduler.scheduleJob(jobDetail, trigger); // 启动定时任务 logger.info("---------启动调度任务" + timerTask.getTaskName() + '&' + cron + "成功-------"); scheduler.start(); } }
公共定时调度处理类
/** * 公共定时任务调度 * * @author zhengjianhua * @Create 2021-10-09 */ @Component public class CommonTimerTaskScheduleExec extends AbstractScheduleExec implements Job { //调用类 private static CommonTimerTaskFeignApi commonTimerTaskFeignApi; @Resource public void setCommonTimerTaskFeignApi(CommonTimerTaskFeignApi commonTimerTaskFeignApi) { CommonTimerTaskScheduleExec.commonTimerTaskFeignApi = commonTimerTaskFeignApi; } @Override protected void startExec(JobExecutionContext context) throws JobExecutionException { StringBuffer sbf = new StringBuffer(); sbf.append("执行公共定时任务:{"); // 获取jobDataMap JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); // 获取taskKey String taskKey = StrHelper.getObjectValue(jobDataMap.get("taskKey")); // 根据taskKey 触发方法 R<String> rResult = commonTimerTaskFeignApi.triggerCommonTimerTask(taskKey); // 执行成功 if (ApiReqResultVerifyUtil.verify(rResult)) { sbf.append(rResult.getData()); } else { sbf.append("triggerCommonTimerTask公共定时任务触发失败:").append(rResult.getMsg()); } sbf.append("}"); context.setResult(sbf.toString()); } @Override public Object resultService(Map<String, Object> interfaceMap, Object result) { return null; } @Override public void execute(JobExecutionContext context) throws JobExecutionException { //执行父类 startExec(context); } }
基础服务中的接收方法
@ApiOperation(value = "根据taskKey触发公共定时任务", notes = "根据taskKey触发公共定时任务") @GetMapping("/triggerCommonTimerTask") public R<String> triggerCommonTimerTask(@RequestParam("taskKey") String taskKey) { try { // 根据任务参数查询 定时任务数据 CommonTimerTask commonTimerTask = commonTimerTaskService.queryCommonTimerTaskByKey(taskKey); // 创建公共定时任务环境类 CommonTimerTaskContext timerTaskContext = new CommonTimerTaskContext(commonTimerTask.getTaskType()); // 执行定时任务处理逻辑 String result = timerTaskContext.execTimerTask(taskKey); // 返回结果 return success(result); } catch (BizException e) { return fail(ExceptionCode.SERVICE_EX.getCode(), e.getMessage()); } catch (Exception e) { log.error("根据taskKey触发公共定时任务错误:[CommonTimerTaskController.triggerCommonTimerTask] Failed to get sequence. exception[{}-{}]", new Object[]{e.getMessage(), Arrays.deepToString(e.getStackTrace())}); return fail(ExceptionCode.OPERATION_EX.getCode(), "根据taskKey触发公共定时任务错误,请联系系统管理员处理!" + e.getMessage()); } }
公共定时任务环境类
/** * 公共定时任务环境类 */ public class CommonTimerTaskContext { /** * 持有公共定时任务策略类 */ private CommonTimerTaskStrategy commonTimerTaskStrategy; /** * 构造函数 * @param taskType 定时任务类型 */ public CommonTimerTaskContext(String taskType){ // 根据任务类型 获取策略类 this.commonTimerTaskStrategy = new CommonTimerTaskStrategyFactory().getCommonTimerTaskStrategy(taskType); } /** * 返回执行结果 * @return 直接结果 */ public String execTimerTask(String taskKey){ return this.commonTimerTaskStrategy.execTimerTask(taskKey); } }
公共定时任务策略工厂
/** * 公共定时任务策略工厂 */ public class CommonTimerTaskStrategyFactory { /** * 根据类型获取策略实现 * * @param type * @return */ public CommonTimerTaskStrategy getCommonTimerTaskStrategy(String type) { CommonTimerTaskStrategy commonTimerTaskStrategy = null; switch (type) { case "qc_hospital_plan": commonTimerTaskStrategy = SpringUtils.getBean(QualityCheckHospitalPlanStrategy.class); break; case "": break; default: throw new BizException("未找到匹配策略类型"); } return commonTimerTaskStrategy; } }
公共定时任务策略类
/** * 公共定时任务策略类 */ public interface CommonTimerTaskStrategy { /** * 执行订单任务 * @return 执行结果 */ String execTimerTask(String taskKey); }
院级质控计划定时任务执行类
/** * 院级质控计划定时任务执行类 */ @Component public class QualityCheckHospitalPlanStrategy implements CommonTimerTaskStrategy { @Autowired private QualityCheckTaskFeignApi qualityCheckTaskFeignApi; @Override public String execTimerTask(String taskKey) { String result = ""; // 调用护管接口 生成院级质控计划 并返回结果 R<String> rResult = qualityCheckTaskFeignApi.generateQualityCheckHospitalPlanByTimerTask(taskKey); if (ApiReqResultVerifyUtil.verify(rResult)) { result = "业务执行成功:" + rResult.getData(); } else { result = "业务执行失败:" + rResult.getMsg(); } return result; } }
还没有评论,来说两句吧...