写点什么

贼好用,冰河开源了这款精准定时任务和延时队列框架!!

用户头像
冰河
关注
发布于: 2020 年 11 月 24 日
贼好用,冰河开源了这款精准定时任务和延时队列框架!!

写在前面


在实际工作中,很多小伙伴在开发定时任务时,会采取定时扫描数据表的方式实现。然而,这种方式存在着重大的缺陷:如果数据量大的话,频繁的扫描数据表会对数据库造成巨大的压力;难以支撑大规模的分布式定时任务;难以支持精准的定时任务;大量浪费 CPU 的资源;扫描的数据大部分是不需要执行的任务。那么,既然定时扫描数据表存在这么多的弊端,那么,有没有一种方式来解决这些问题呢?今天,冰河就带着他的开源项目 mykit-delay 来了!!开源地址:https://github.com/sunshinelyz/mykit-delay


在使用框架过程中如有任何问题,都可以添加冰河微信【sunshinelyz】进行交流。


文章已收录到 https://github.com/sunshinelyz/technology-binghe


项目简述


Mykit 体系中提供的简单、稳定、可扩展的延迟消息队列框架,提供精准的定时任务和延迟队列处理功能。


项目模块说明


  • mykit-delay-common: mykit-delay 延迟消息队列框架通用工具模块,提供全局通用的工具类

  • mykit-delay-config: mykit-delay 延迟消息队列框架通用配置模块,提供全局配置

  • mykit-delay-queue: mykit-delay 延迟消息队列框架核心实现模块,目前所有主要的功能都在此模块实现

  • mykit-delay-controller: mykit-delay 延迟消息队列框架 Restful 接口实现模块,对外提供 Restful 接口访问,兼容各种语言调用

  • mykit-delay-core: mykit-delay 延迟消息队列框架的入口,整个框架的启动程序在此模块实现

  • mykit-delay-test: mykit-delay 延迟消息队列框架通用测试模块,主要提供 Junit 单元测试用例


需求背景


  • 用户下订单后未支付,30 分钟后支付超时

  • 在某个时间点通知用户参加系统活动

  • 业务执行失败之后隔 10 分钟重试一次


类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。


队列设计


整体架构设计如下图所示。


开发前需要考虑的问题


  • 及时性 消费端能按时收到

  • 同一时间消息的消费权重

  • 可靠性 消息不能出现没有被消费掉的情况

  • 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复

  • 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费

  • 高可用 多实例 这里指 HA/主备模式并不是多实例同时一起工作

  • 消费端如何消费


当然初步选用 redis 作为数据缓存的主要原因是因为 redis 自身支持 zset 的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持 map list 数据结构。


简单定义一个消息数据结构


private String topic;/***topic**/private String id;/***自动生成 全局惟一 snowflake**/private String bizKey;private long delay;/***延时毫秒数**/private int priority;//优先级private long ttl;/**消费端消费的ttl**/private String body;/***消息体**/private long createTime=System.currentTimeMillis();private int status= Status.WaitPut.ordinal();
复制代码


运行原理:


  • 用 Map 来存储元数据。id 作为 key,整个消息结构序列化(json/…)之后作为 value,放入元消息池中。

  • 将 id 放入其中(有 N 个)一个 zset 有序列表中,以 createTime+delay+priority 作为 score。修改状态为正在延迟中

  • 使用 timer 实时监控 zset 有序列表中 top 10 的数据 。 如果数据 score<=当前时间毫秒就取出来,根据 topic 重新放入一个新的可消费列表(list)中,在 zset 中删除已经取出来的数据,并修改状态为待消费

  • 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入 zset 列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。


客户端


因为涉及到不同程序语言的问题,所以当前默认支持 http 访问方式。


  • 添加延时消息添加成功之后返回消费唯一 ID POST /push {…..消息体}

  • 删除延时消息 需要传递消息 ID GET /delete?id=

  • 恢复延时消息 GET /reStore?expire=true|false expire 是否恢复已过期未执行的消息。

  • 恢复单个延时消息 需要传递消息 ID GET /reStore/id

  • 获取消息 需要长连接 GET /get/topic


用 nginx 暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。


目前系统中客户端并没有采用 HTTP 长连接的方式来消费消息,而是采用 MQ 的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送 MQ 的时候拦截一下 如果是延迟消息就用延迟消息系统处理。


消息可恢复


实现恢复的原理 正常情况下一般都是记录日志,比如 mysql 的 binlog 等。


这里我们直接采用 mysql 数据库作为记录日志。


目前创建以下 2 张表:


  • 消息表 字段包括整个消息体

  • 消息流转表 字段包括消息 ID、变更状态、变更时间、zset 扫描线程 Name、host/ip


定义 zset 扫描线程 Name 是为了更清楚的看到消息被分发到具体哪个 zset 中。前提是 zset 的 key 和监控 zset 的线程名称要有点关系 这里也可以是 zset key。


支持消息恢复


假如 redis 服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表 1 也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。


当然恢复单个任务也可以这么干。


数据表设计

这里,我就直接给出创建数据表的 SQL 语句。

DROP TABLE IF EXISTS `mykit_delay_queue_job`;CREATE TABLE `mykit_delay_queue_job` (  `id` varchar(128) NOT NULL,  `bizkey` varchar(128) DEFAULT NULL,  `topic` varchar(128) DEFAULT NULL,  `subtopic` varchar(250) DEFAULT NULL,  `delay` bigint(20) DEFAULT NULL,  `create_time` bigint(20) DEFAULT NULL,  `body` text,  `status` int(11) DEFAULT NULL,  `ttl` int(11) DEFAULT NULL,  `update_time` datetime(3) DEFAULT NULL,  PRIMARY KEY (`id`),  KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),  KEY `mykit_delay_queue_job_STATUS` (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ------------------------------ Table structure for mykit_delay_queue_job_log-- ----------------------------DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;CREATE TABLE `mykit_delay_queue_job_log` ( `id` varchar(128) NOT NULL, `status` int(11) DEFAULT NULL, `thread` varchar(60) DEFAULT NULL, `update_time` datetime(3) DEFAULT NULL, `host` varchar(128) DEFAULT NULL, KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
复制代码


关于高可用


分布式协调还是选用 zookeeper。


如果有多个实例最多同时只能有 1 个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有 1 个实例处理,可以选用 zookeeper 或者 redis 就能实现分布式锁了。


最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于 docker 的主备部署模式。


运行模式


  • 支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk 集群地址列表

  • 支持 cluster 会涉及到分布式锁竞争 效果不是很明显 分布式锁采用redissetNx实现

  • StandAlone


目前,经过测试,推荐使用 master slave 的模式,后期会优化 Cluster 模式


如何接入


为了提供一个统一的精准定时任务和延时队列框架,mykit-delay 提供了 HTTP Rest 接口供其他业务系统调用,接口使用简单方便,只需要简单的调用接口,传递相应的参数即可。


消息体


以 JSON 数据格式参数 目前只提供了http协议


  • body 业务消息体

  • delay 延时毫秒 距createTime的间隔毫秒数

  • id 任务 ID 系统自动生成 任务创建成功返回

  • status 状态 默认不填写

  • topic 标题

  • subtopic 保留字段

  • ttl 保留字段

  • createTime 创建任务时间 非必填 系统默认


添加任务


````java

/push

POST application/json

{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}

````


删除任务


删除任务 需要记录一个 JobId


````java

/delete?jobId=xxx

GET

````


恢复单个任务


用于任务错乱 脑裂情况 根据日志恢复任务


````java

/reStoreJob?JobId=xxx

GET

````


恢复所有未完成的任务


根据日志恢复任务


````java

/reStore?expire=true

GET

````


参数expire 表示是否需要恢复已过期还未执行的数据


清空队列数据


根据日志中未完成的数据清空队列中全部数据。清空之后 会删除缓存中的所有任务


````java

/clearAll

GET

````


客户端获取队列方式


目前默认实现了RocketMQActiveMQ的推送方式。依赖 MQ 的方式来实现延时框架与具体业务系统的耦合。


消息体中消息与`RocketMQ`和 `ActiveMQ` 消息字段对应关系


| mykit-delay | RocketMQ | ActiveMQ | 备注 | |

| ----------- | -------- | -------- | ---------------------------------- | ---- |

| topic | topic | topic | 点对点发送队列名称或者主题名称 | |

| subtopic | subtopic | subtopic | 点对点发送队列子名称或者主题子名称 | |

| body | 消息内容 | 消息内容 | 消息内容 | |


关于系统配置


延迟框架与具体执行业务系统的交互方式通过延迟框架配置实现,具体配置文件位置为 mykit-delay-config 项目下的resources/properties/starter.properties文件中。


测试


需要配置好数据库地址和 Redis 的地址 如果不是单机模式 也需要配置好 Zookeeper


运行 mykit-delay-test 模块下的测试类io.mykit.delay.test.PushTest添加任务到队列中


启动 mykit-delay-test 模块下的io.mykit.delay.TestDelayQueue消费前面添加数据 为了方便查询效果 默认的消费方式是consoleCQ 控制台输出


扩展


支持 zset 队列个数可配置 避免大数据带来高延迟的问题。


近期规划


  • 分区(buck)支持动态设置

  • redis 与数据库数据一致性的问题 (重要

  • 实现自己的推拉机制

  • 支持可切换实现方式 目前只是依赖 Redis 实现,后续待优化

  • 支持 Web 控制台管理队列

  • 实现消息消费TTL机制


如果这款开源框架对你有帮助,请小伙伴们打开 github 链接:https://github.com/sunshinelyz/mykit-delay ,给个 Star,让更多的小伙伴看到,减轻工作中繁琐的扫描数据表的定时任务开发。也希望能够有越来越多的小伙伴参与这个开源项目,我们一起养肥它!!



好了,不早了,今天就到这儿吧,我是冰河,我们下期见!!


重磅福利


微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天阅读超硬核技术干货,公众号内回复【PDF】有我准备的一线大厂面试资料和我原创的超硬核 PDF 技术文档,以及我为大家精心准备的多套简历模板(不断更新中),希望大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过努力成功进入到了心仪的公司,一定不要懈怠放松,职场成长和新技术学习一样,不进则退。如果有幸我们江湖再见!


另外,我开源的各个 PDF,后续我都会持续更新和维护,感谢大家长期以来对冰河的支持!!


写在最后


如果你觉得冰河写的还不错,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者已经通过阅读「 冰河技术 」微信公众号文章,吊打面试官,成功跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样提升自己的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何提升技术能力不再迷茫!


发布于: 2020 年 11 月 24 日阅读数: 972
用户头像

冰河

关注

公众号:冰河技术 2020.05.29 加入

Mykit系列开源框架发起者、核心架构师和开发者,《海量数据处理与大数据技术实战》与《MySQL开发、优化与运维实战》作者。【冰河技术】微信公众号作者。

评论

发布
暂无评论
贼好用,冰河开源了这款精准定时任务和延时队列框架!!