RocketMQ 源码探究 -- 延迟队列实现
欢迎关注公众号:程序员老周
概要
RocketMQ 提供了生产可用的延迟队列实现,本文将从具体使用、架构设计和实现原理等方面详细介绍该特性的实现。
简介
延迟队列是能存储未来的任务,并能在指定时间节点触发的一种数据结构。
RocketMQ 提供了对延迟消息的支持,我们只需在普通消息上设置 delayTimeLevel,RocketMQ 会在指定的 timeLevel 时触发,完成消息的投递工作。
需要注意的是,RocketMQ 默认支持 18 个时间等级的配置,可以通过在 broker 的配置文件中修改。
下面是简单的代码示例。
架构设计
Untitled
在我看来,延迟队列主要由存储和调度两个核心技术点组成。
存储
如何确保不被提前消费
消费者是跟 consume queue 交互,所以为达成不被提前消费目标,必然需要对 consume queue 的设计进行改造。
消息被发送到 broker 时,首先正常写入 commit log,在分发至 consume queue 时,会将 topic 改成 SCHEDULE_TOPIC_XXXX
,将 queue id 改成 delayTimeLevel - 1
。同时暂存原来的 topic 和 queue。
SCHEDULE_TOPIC_XXXX
被归为系统 topic,当 client 订阅系统 topic 时,会抛出异常。
如何保证有序性
在设计延迟队列时,一般需要保证存储的数据是有序的,这样才能减少遍历访问条数。
Java 的 DelayQueue
依赖小顶堆做排序,Redisson 的 DelayQueue 依赖 zset
做数据排序。
在 RocketMQ 中,只支持固定时间的延迟。RocketMQ 按照不同延迟分 18 个队列来保证数据有序,即在一个队列中,先入队列的永远比后入队列的先到期。
调度
定时任务
Untitled
在 RocketMQ 中,每个延迟队列都会分配一个 Timer
,Timer
轮询 consume queue
数据,进行延时逻辑判断。
当系统启动时,会读取 $ROCKETMQ_HOME/config/delayOffset.json
内容,加载每个队列中的 offset。
从 consume queue 中加载从 offset 开始的所有数据,按顺序进行遍历。
**注意:在生成 consume queue 时,会将该条消息的过期时间提前算好,放到 consume queue 的 tag code 中。**这也是 consume queue 的 tag code 本来只需 4 字节(hashcode),但分配了 8 字节的原因。
如果消息到了过期时间,会到 commit log 中将数据补齐,换成替换前的 topic 和 queue,再次作为普通消息投递到 commit log 中,等待消费者的拉取。
如果没有到过期时间,系统会启动一个下次请求到来的时的 Timer。这是因为分队列 + 固定时间延迟的好处,不用担心两次请求之间插入新的请求。
offset 持久化
如果 RocketMQ 宕机,如何保证下次启动仍然能够从上次的 offset 消费呢?
RocketMQ 维护了一个每隔 10s 运行一次的 Timer,目的是将每个队列的消费情况写入文件,等到下次重启时,可以从上次消费位置进行消费。
因为不是实时同步,有一定的时间间隔,所以,有可能会出现重复消息的情况。我们需要在消费端做幂等处理,避免重复消费。
总结
RocketMQ 支持 18 个时间等级的延迟消息
在存储层面,RocketMQ 通过修改 topic 保证不被提前消费,生成 18 个队列保证消息的有序性
基于 Timer 机制实现轮询 consume queue 逻辑
固定时间等级的可用性比较差,需要改造才能用于业务开发
版权声明: 本文为 InfoQ 作者【周文童】的原创文章。
原文链接:【http://xie.infoq.cn/article/b8d6c3aaa210432fadf0ea75a】。文章转载请联系作者。
评论