写点什么

🍃【SpringBoot 技术专题】「开发实战系列」动态化 Quartz 任务调度机制 + 实时推送任务数据到前端

发布于: 刚刚
🍃【SpringBoot技术专题】「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前端

前提介绍

SpringBoot2.0 整合 quartz 实现多定时任务动态配置,实现任务增删改,生成 Cron 表达式

动态化任务调度

添加依赖包

<!-- quartz --><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-quartz</artifactId></dependency>
复制代码

DynamicSchedulerConfig

import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.quartz.SchedulerFactoryBean;@Configurationpublic class DynamicSchedulerConfig implements SchedulerFactoryBeanCustomizer{  @Override  public void customize(SchedulerFactoryBean schedulerFactoryBean) {          schedulerFactoryBean.setStartupDelay(2);           schedulerFactoryBean.setAutoStartup(true);          schedulerFactoryBean.setOverwriteExistingJobs(true);        }}
复制代码

application.yml 配置

server:    port: 8101# 默认的profile为dev,其他环境通过指定启动参数使用不同的profile,比如:  #   测试环境:java -jar quartz-service.jar --spring.profiles.active=test  #   生产环境:java -jar quartz-service.jar --spring.profiles.active=prod  spring:  datasource:    type: com.alibaba.druid.pool.DruidDataSource     #这里是配置druid连接池,以下都是druid的配置信息    url: jdbc:mysql://127.0.0.1:3306/task?useUnicode=true&characterEncoding=utf-8&useSSL=false    driver-class-name: com.mysql.jdbc.Driver    username: root    password: 123456#quartz相关属性配置 quartz:    properties:      org:        quartz:          scheduler:            instanceName: clusteredScheduler            instanceId: AUTO          jobStore:            class: org.quartz.impl.jdbcjobstore.JobStoreTX            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate            tablePrefix: QRTZ_            isClustered: true            clusterCheckinInterval: 10000            useProperties: false          threadPool:            class: org.quartz.simpl.SimpleThreadPool            threadCount: 10            threadPriority: 5            threadsInheritContextClassLoaderOfInitializingThread: true    #数据库方式    job-store-type: jdbc# mybatisplus配置mybatis-plus:  mapper-locations: classpath*:/mapper/**Mapper.xml  #把xml文件放在com.XX.mapper.*中可能会出现找到的问题,这里把他放在resource下的mapper中  typeAliasesPackage: com.task.entity  #这里是实体类的位置,#实体扫描,多个package用逗号或者分号分隔  configuration:    map-underscore-to-camel-case: true    cache-enabled: falselogging:  file: task-info.log  level:    com.task: debug
复制代码

TaskInfoService 业务服务类

@Servicepublic class TaskInfoService extends IService<TaskInfoBO> {    /**
* @Title: getPageJob * @Description: TODO(查询定时任务,分页) * @param @param search * @param @return 参数 * @return Map<String,Object> 返回类型 * @throws */
IPage<TaskInfoBO> getPageJob(Pageable pageable, MultiValueMap queryParam);
/**
* @Title: getPageJobmod * @Description: TODO(查询定时任务) * @param @return 参数 * @return TaskInfoBO 返回类型 * @throws */ TaskInfoBO getPageJobmod();
/** * @Title: addJob * @Description: TODO(添加任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @param cronExpression cron时间规则 * @param @throws Exception 参数 * @return void 返回类型 * @throws */ void addJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception;
/** * @Title: addJob * @Description: TODO(添加动态任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @param cronExpression cron时间规则 * @param @param jobDescription 参数 * @param @param params * @param @throws Exception 参数说明 * @return void 返回类型 * @throws */
void addJob(String jobClassName, String jobGroupName, String cronExpression, String jobDescription, Map<String, Object> params) throws Exception;
/** * @Title: updateJob * @Description: TODO(更新定时任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @param cronExpression cron时间规则 * @param @throws Exception 参数 * @return void 返回类型 * @throws */ void updateJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception;
/** * @Title: deleteJob * @Description: TODO(删除定时任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @throws Exception 参数 * @return void 返回类型 * @throws */
void deleteJob(String jobClassName, String jobGroupName) throws Exception;
/** * @Title: pauseJob * @Description: TODO(暂停定时任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @throws Exception 参数 * @return void 返回类型 * @throws */ void pauseJob(String jobClassName, String jobGroupName) throws Exception;
/** * @Title: resumejob * @Description: TODO(恢复任务) * @param @param jobClassName 任务路径名称 * @param @param jobGroupName 任务分组 * @param @throws Exception 参数 * @return void 返回类型 * @throws */ void resumejob(String jobClassName, String jobGroupName) throws Exception;}
复制代码

TaskInfoServiceImpl 业务服务类


@Slf4j@Service@Transactionalpublic class TaskInfoServiceImpl extends ServiceImpl<JobAndTriggerMapper, TaskInfoBO> implements TaskInfoService {
@Autowired
private Scheduler scheduler;
@Override
public IPage<TaskInfoBO> getPageJob(Pageable pageable, MultiValueMap queryParam) {
IPage<TaskInfoBO> page = new Page<>(pageable.getPageNumber(), pageable.getPageSize());
return baseMapper.getJobAndTriggerDetails(page);
}
@Override
public TaskInfoBO getPageJobmod() { return baseMapper.getJobAndTriggerDto(); }
@Override
public void addJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception { // 启动调度器 scheduler.start(); // 构建job信息 JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass()) .withIdentity(jobClassName, jobGroupName).build(); // 表达式调度构建器(即任务执行的时间) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); // 按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName) .withSchedule(scheduleBuilder).build(); try { scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { throw new Exception("创建定时任务失败"); } } @Override public void addJob(String jobClassName, String jobGroupName, String cronExpression, String jobDescription, Map<String, Object> params) throws Exception { // 启动调度器 scheduler.start(); // 构建job信息 JobDetail jobDetail = JobBuilder.newJob(TaskInfoServiceImpl.getClass(jobClassName).getClass()) .withIdentity(jobClassName, jobGroupName).withDescription(jobDescription).build(); Iterator<Map.Entry<String, Object>> var7 = params.entrySet().iterator(); while(var7.hasNext()) { Map.Entry<String, Object> entry = var7.next(); jobDetail.getJobDataMap().put((String)entry.getKey(), entry.getValue()); } // 表达式调度构建器(即任务执行的时间) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); // 按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName) .withSchedule(scheduleBuilder).build(); try { scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { throw new Exception("创建定时任务失败"); } }
@Override
public void updateJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobClassName, jobGroupName); // 表达式调度构建器(动态修改后不立即执行) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger重新设置job执行 scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { throw new Exception("更新定时任务失败"); } }
@Override
public void deleteJob(String jobClassName, String jobGroupName) throws Exception { scheduler.pauseTrigger(TriggerKey.triggerKey(jobClassName, jobGroupName)); scheduler.unscheduleJob(TriggerKey.triggerKey(jobClassName, jobGroupName)); scheduler.deleteJob(JobKey.jobKey(jobClassName, jobGroupName)); }
@Override public void pauseJob(String jobClassName, String jobGroupName) throws Exception { scheduler.pauseJob(JobKey.jobKey(jobClassName, jobGroupName)); }
@Override
public void resumejob(String jobClassName, String jobGroupName) throws Exception { scheduler.resumeJob(JobKey.jobKey(jobClassName, jobGroupName)); } public static BaseJob getClass(String classname) throws Exception { Class<?> class1 = Class.forName(classname); return (BaseJob) class1.newInstance(); }}
复制代码

TaskInfoMapper

public interface JobAndTriggerMapper extends BaseMapper<TaskInfoBO> {    IPage<TaskInfoBO> getJobAndTriggerDetails(IPage<TaskInfoBO> page);    TaskInfoBO getTaskInfoModel;}
复制代码

TaskInfoMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.south.data.mapper.JobAndTriggerMapper"> <select id="getJobAndTriggerDetails" resultType="com.data.vo.TaskInfoBO"> SELECT
jd.JOB_NAME AS jobName,
jd.DESCRIPTION AS jobDescription,
jd.JOB_GROUP AS jobGroupName,
jd.JOB_CLASS_NAME AS jobClassName,
t.TRIGGER_NAME AS triggerName,
t.TRIGGER_GROUP AS triggerGroupName,
FROM_UNIXTIME(t.PREV_FIRE_TIME/1000,'%Y-%m-%d %T') AS prevFireTime,
FROM_UNIXTIME(t.NEXT_FIRE_TIME/1000,'%Y-%m-%d %T') AS nextFireTime,
ct.CRON_EXPRESSION AS cronExpression,
t.TRIGGER_STATE AS triggerState
FROM
qrtz_job_details jd
JOIN qrtz_triggers t
JOIN qrtz_cron_triggers ct ON jd.JOB_NAME = t.JOB_NAME
AND t.TRIGGER_NAME = ct.TRIGGER_NAME
AND t.TRIGGER_GROUP = ct.TRIGGER_GROUP
</select>
<select id="getJobAndTriggerDto" resultType="com.south.data.vo.JobAndTriggerDto">
SELECT
jd.JOB_NAME AS jobName,
jd.DESCRIPTION AS jobDescription,
jd.JOB_GROUP AS jobGroupName,
jd.JOB_CLASS_NAME AS jobClassName,
t.TRIGGER_NAME AS triggerName,
t.TRIGGER_GROUP AS triggerGroupName,
FROM_UNIXTIME(t.PREV_FIRE_TIME/1000,'%Y-%m-%d %T') AS prevFireTime,
FROM_UNIXTIME(t.NEXT_FIRE_TIME/1000,'%Y-%m-%d %T') AS nextFireTime,
ct.CRON_EXPRESSION AS cronExpression,
t.TRIGGER_STATE AS triggerState
FROM
qrtz_job_details jd
JOIN qrtz_triggers t
JOIN qrtz_cron_triggers ct ON jd.JOB_NAME = t.JOB_NAME
AND t.TRIGGER_NAME = ct.TRIGGER_NAME
AND t.TRIGGER_GROUP = ct.TRIGGER_GROUP
</select>
</mapper>
复制代码

BaseJob 类

public interface BaseJob extends Job {  public void execute(JobExecutionContext context) throws JobExecutionException;}
复制代码

TaskController 控制器

@RestController@RequestMapping(value = "/job")public class JobController  {
@Autowired private TaskInfoService taskInfoService;
public JobController(TaskInfoService taskInfoService){ this.taskInfoService = taskInfoService; }
@PostMapping(value = "/page") public ResponseEntity<List<TaskInfoBO>> queryjob(Pageable pageable, @RequestParam MultiValueMap<String, String> queryParams, UriComponentsBuilder uriBuilder) { IPage<TaskInfoBO> page = taskInfoService.getPageJob(pageable, queryParams); HttpHeaders headers = PaginationUtil.generatePaginationHttpHeaders(uriBuilder.queryParams(queryParams), page); return ResponseEntity.ok().headers(headers).body(page.getRecords()); }
/** * @Title: addJob * @Description: TODO(添加Job) * @param jobClassName * 类名 * @param jobGroupName * 组名 * @param cronExpression * 表达式,如:0/5 * * * * ? (每隔5秒) */ @PostMapping(value = "/add") public ResponseEntity addJob( @RequestParam(value = "jobClassName") String jobClassName, @RequestParam(value = "jobGroupName") String jobGroupName, @RequestParam(value = "cronExpression") String cronExpression){ try { jobAndTriggerService.addJob(jobClassName, jobGroupName, cronExpression); return ResponseEntity.ok().body("操作成功"); } catch (Exception e) { return ResponseEntity.ok().body("操作失败"); } } /** * @Title: pauseJob * @Description: TODO(暂停Job) * @param jobClassName * 类名 * @param jobGroupName * 组名 */
@PostMapping(value = "/pause") public ResponseEntity pauseJob( @RequestParam(value = "jobClassName") String jobClassName, @RequestParam(value = "jobGroupName") String jobGroupName) { try { taskInfoService.pauseJob(jobClassName, jobGroupName); return ResponseEntity.ok().body("操作成功"); } catch (Exception e) { return ResponseEntity.ok().body("操作失败"); } } /** * @Title: resumeJob * @Description: TODO(恢复Job) * @param jobClassName * 类名 * @param jobGroupName * 组名 */ @PostMapping(value = "/resume") public ResponseEntity resumeJob( @RequestParam(value = "jobClassName") String jobClassName, @RequestParam(value = "jobGroupName") String jobGroupName) { try { taskInfoService.resumejob(jobClassName, jobGroupName); return ResponseEntity.ok().body("操作成功"); } catch (Exception e) { return ResponseEntity.ok().body("操作失败"); } } /** * @Title: rescheduleJob * @Description: TODO(重新设置Job) * @param jobClassName * 类名 * @param jobGroupName * 组名 * @param cronExpression * 表达式 */
@PostMapping(value = "/reschedule") public ResponseEntity rescheduleJob( @RequestParam(value = "jobClassName") String jobClassName, @RequestParam(value = "jobGroupName") String jobGroupName, @RequestParam(value = "cronExpression") String cronExpression) { try { taskInfoService.updateJob(jobClassName, jobGroupName, cronExpression); return ResponseEntity.ok().body("操作成功"); } catch (Exception e) { return ResponseEntity.ok().body("操作失败"); } }
/** * @Title: deleteJob * @Description: TODO(删除Job) * @param jobClassName * 类名 * @param jobGroupName * 组名 */
@RequestMapping(value = "/del", method = RequestMethod.POST)
public ResponseEntity deleteJob(@RequestParam(value = "jobClassName") String jobClassName, @RequestParam(value = "jobGroupName") String jobGroupName) { try { taskInfoService.deleteJob(jobClassName, jobGroupName); return ResponseEntity.ok().body("操作成功"); } catch (Exception e) { return ResponseEntity.ok().body("操作失败"); } }}
复制代码


DeferredResult 实现实时推送

  • 浏览器要实时展示服务端计算出来的数据。一种可能的实现是:浏览器频繁向服务端发起请求以获得服务端数据。

  • 若定时周期为 S,则数据延迟周期最大即为 S。若想缩短数据延迟周期,则应使 S 尽量小,而 S 越小,浏览器向服务端发起请求的频率越高,又造成网络握手次数越多,影响了效率。因此,此场景应使用服务端实时推送技术。

  • 这里说是推送,其实还是基于请求-响应机制,只不过发起的请求会在服务端挂起,直到请求超时或服务端有数据推送时才会做出响应,响应的时机完全由服务端控制。所以,整体效果看起来就像是服务端真的在“实时推送”一样。


可以利用 DeferredResult 来实现异步长连接的服务端实时推送。

使用案例
@RequestMapping("/call")@ResponseBodypublic DeferredResult<Object> call() {  // 泛型Object表示返回结果的类型    DeferredResult<Object> response = new DeferredResult<Object>(10000, // 请求的超时时间  null); // 超时后响应的结果    response.onCompletion(new Runnable() {        @Override        public void run() {            // 请求处理完成后所做的一些工作        }    });    // 设置响应结果    // 调用此方法时立即向浏览器发出响应;未调用时请求被挂起    response.setResult(new Object());    return response;}
复制代码

执行逻辑

  1. 浏览器发起异步请求

  2. 请求到达服务端被挂起(使用浏览器查看请求状态,此时为 pending)

  3. 向浏览器进行响应,分为两种情况:

  4. 服务端调用 DeferredResult.setResult(),请求被唤醒,返回结果。

  5. 超时,返回一个你设定的结果。

  6. 浏览得到响应,再次重复 1,处理此次响应结果

实现 DeferResult 传输模型

public interface DeferredData {    String getId(); // 唯一标识}
复制代码

DeferredResult 的持有者

public interface IDeferredResultHolder<DeferredData> {    DeferredResult<DeferredData> newDeferredResult(String key, long timeout, Object timeoutResult);    void add(String key, DeferredResult<DeferredData> deferredResult);    DeferredResult<DeferredData> get(String key);    void remove(String key);    void handleDeferredData(DeferredData deferredData);}
复制代码

DeferredResult 的持有者实现

public class DeferredResultHolder implements IDeferredResultHolder<DeferredData> {
private Map<String, DeferredResult<DeferredData>> deferredResults = new ConcurrentHashMap<String, DeferredResult<DeferredData>>();
public DeferredResult<DeferredData> newDeferredResult(String key) { return new DeferredResult(key, 30 * 1000L, null); } public DeferredResult<DeferredData> newDeferredResult(String key, long timeout) { return new DeferredResult(key, timeout, null); } public DeferredResult<DeferredData> newDeferredResult(String key, Object timeoutResult) { return new DeferredResult(key, 30 * 1000L, timeoutResult); } @Override public DeferredResult<DeferredData> newDeferredResult(String key, long timeout, Object timeoutResult) { DeferredResult<DeferredData> deferredResult = newDeferredResult<DeferredData>(timeout, timeoutResult); add(key, deferredResult); deferredResult.onCompletion(new Runnable() { @Override public void run() { remove(key); } }); return deferredResult; }
@Override public void add(String key, DeferredResult<DeferredData> deferredResult) { deferredResults.put(key, deferredResult); }
@Override public DeferredResult<DeferredData> get(String key) { return deferredResults.get(key); }
@Override public void remove(String key) { deferredResults.remove(key); }
@Override public void handleDeferredData(DeferredData deferredData) { String key = deferredData.getId(); DeferredResult<DeferredData> deferredResult = get(key); if (deferredResult != null) { deferredResult.setResult(deferredData); } }}
复制代码
调用端
@RequestMapping@Controllerpublic class CallController {    @Autowired    private DeferredResultHolder deferredResultHolder;    @RequestMapping("/call")    @ResponseBody    public DeferredResult<DeferredData> call() {        String id = "abc";        return deferredResultHolder.newDeferredResult(id, 10 * 1000L, null);    }}
复制代码
触发返回端
@RequestMapping@Controllerpublic class CallController {    @Autowired    private DeferredResultHolder deferredResultHolder;    @RequestMapping("/finished")    @ResponseBody    public void finished() {        String id = "abc";        DeferredData defdatq = new CustomerDeferredData(id); // 此处的CustomerDeferredData为实现了DeferredData接口的实现模型        return deferredResultHolder.handleDeferredData(defdatq);    }}
复制代码


发布于: 刚刚阅读数: 2
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 “任何足够先进的技术都是魔法“

评论

发布
暂无评论
🍃【SpringBoot技术专题】「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前端