写点什么

RocketMQ 源码探究 -- 延迟队列实现

作者:周文童
  • 2022 年 1 月 25 日
  • 本文字数:1802 字

    阅读完需:约 6 分钟

RocketMQ 源码探究 -- 延迟队列实现

欢迎关注公众号:程序员老周

概要

RocketMQ 提供了生产可用的延迟队列实现,本文将从具体使用、架构设计和实现原理等方面详细介绍该特性的实现。

简介

延迟队列是能存储未来的任务,并能在指定时间节点触发的一种数据结构。

RocketMQ 提供了对延迟消息的支持,我们只需在普通消息上设置 delayTimeLevel,RocketMQ 会在指定的 timeLevel 时触发,完成消息的投递工作。

需要注意的是,RocketMQ 默认支持 18 个时间等级的配置,可以通过在 broker 的配置文件中修改。

下面是简单的代码示例。

public static void main(String[] args) throws Exception {        DefaultMQProducer producer = new DefaultMQProducer("delay-producer");        producer.setNamesrvAddr("localhost:9876");        producer.setSendMsgTimeout(100 * 1000);        producer.start();        for (int i = 0; i < 18; i++) {            Message message = new Message("delay", ("hello-delay" + i).getBytes(StandardCharsets.UTF_8));            message.setDelayTimeLevel(i + 1);            producer.send(message);        }        producer.shutdown();    }
复制代码

架构设计

Untitled

在我看来,延迟队列主要由存储和调度两个核心技术点组成。

存储

如何确保不被提前消费

消费者是跟 consume queue 交互,所以为达成不被提前消费目标,必然需要对 consume queue 的设计进行改造。

消息被发送到 broker 时,首先正常写入 commit log,在分发至 consume queue 时,会将 topic 改成 SCHEDULE_TOPIC_XXXX ,将 queue id 改成 delayTimeLevel - 1 。同时暂存原来的 topic 和 queue。

SCHEDULE_TOPIC_XXXX 被归为系统 topic,当 client 订阅系统 topic 时,会抛出异常。

public static boolean isSystemTopic(String topic, RemotingCommand response) {    if (isSystemTopic(topic)) {        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark("The topic[" + topic + "] is conflict with system topic.");        return true;    }    return false;}
复制代码

如何保证有序性

在设计延迟队列时,一般需要保证存储的数据是有序的,这样才能减少遍历访问条数。

Java 的 DelayQueue 依赖小顶堆做排序,Redisson 的 DelayQueue 依赖 zset 做数据排序。

在 RocketMQ 中,只支持固定时间的延迟。RocketMQ 按照不同延迟分 18 个队列来保证数据有序,即在一个队列中,先入队列的永远比后入队列的先到期。

调度

定时任务

Untitled

在 RocketMQ 中,每个延迟队列都会分配一个 TimerTimer 轮询 consume queue 数据,进行延时逻辑判断。

当系统启动时,会读取 $ROCKETMQ_HOME/config/delayOffset.json 内容,加载每个队列中的 offset。

{  "offsetTable":{1:3,2:2,3:33,4:2,5:2,6:2,7:2,8:2,9:2,10:2,11:2,12:2,13:2,14:2,15:2,16:2,17:12,18:1  }}
复制代码

从 consume queue 中加载从 offset 开始的所有数据,按顺序进行遍历。

**注意:在生成 consume queue 时,会将该条消息的过期时间提前算好,放到 consume queue 的 tag code 中。**这也是 consume queue 的 tag code 本来只需 4 字节(hashcode),但分配了 8 字节的原因。

如果消息到了过期时间,会到 commit log 中将数据补齐,换成替换前的 topic 和 queue,再次作为普通消息投递到 commit log 中,等待消费者的拉取。

如果没有到过期时间,系统会启动一个下次请求到来的时的 Timer。这是因为分队列 + 固定时间延迟的好处,不用担心两次请求之间插入新的请求。

offset 持久化

如果 RocketMQ 宕机,如何保证下次启动仍然能够从上次的 offset 消费呢?

RocketMQ 维护了一个每隔 10s 运行一次的 Timer,目的是将每个队列的消费情况写入文件,等到下次重启时,可以从上次消费位置进行消费。

因为不是实时同步,有一定的时间间隔,所以,有可能会出现重复消息的情况。我们需要在消费端做幂等处理,避免重复消费。

总结

  • RocketMQ 支持 18 个时间等级的延迟消息

  • 在存储层面,RocketMQ 通过修改 topic 保证不被提前消费,生成 18 个队列保证消息的有序性

  • 基于 Timer 机制实现轮询 consume queue 逻辑

  • 固定时间等级的可用性比较差,需要改造才能用于业务开发

发布于: 刚刚阅读数: 2
用户头像

周文童

关注

还未添加个人签名 2017.10.18 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ 源码探究 -- 延迟队列实现