写点什么

RocketMQ 延迟消息的代码实战及原理分析,实战解析

作者:MySQL神话
  • 2021 年 11 月 27 日
  • 本文字数:3159 字

    阅读完需:约 10 分钟

延迟消息示例

首先,写一个消费者,用于消费延迟消息:


public class Consumer {


public static void main(String[] args) throws MQClientException {


SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");


// 实例化消费者


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");


// 设置 NameServer 的地址


consumer.setNamesrvAddr("localhost:9876");


// 订阅一个或者多个 Topic,以及 Tag 来过滤需要消费的消息


consumer.subscribe("OneMoreTopic", "*");


// 注册回调实现类来处理从 broker 拉取回来的消息


consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {


System.out.printf("%s %s Receive New Messages:%n"


, sdf.format(new Date())


, Thread.currentThread().getName());


for (MessageExt msg : msgs) {


System.out.printf("\tMsg Id: %s%n", msg.getMsgId());


System.out.printf("\tBody: %s%n", new String(msg.getBody()));


}


// 标记该消息已经被成功消费


return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;


});


// 启动消费者实例


consumer.start();


System.out.println("Consumer Started.");


}


}


再写一个延迟消息的生


《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享


产者,用于发送延迟消息:


public class DelayProducer {


public static void main(String[] args) throws Exception {


SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");


// 实例化消息生产者 Producer


DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");


// 设置 NameServer 的地址


producer.setNamesrvAddr("localhost:9876");


// 启动 Producer 实例


producer.start();


Message msg = new Message("OneMoreTopic"


, "DelayMessage", "This is a delay message.".getBytes());


//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"


//设置消息延迟级别为 3,也就是延迟 10s。


msg.setDelayTimeLevel(3);


// 发送消息到一个 Broker


SendResult sendResult = producer.send(msg);


// 通过 sendResult 返回消息是否成功送达


System.out.printf("%s Send Status: %s, Msg Id: %s %n"


, sdf.format(new Date())


, sendResult.getSendStatus()


, sendResult.getMsgId());


// 如果不再发送消息,关闭 Producer 实例。


producer.shutdown();


}


}


运行生产者以后,就会发送一条延迟消息:


10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000


10 秒钟后,消费者收到的这条延迟消息:


10:37:25.026 ConsumeMessageThread_1 Receive New Messages:


Msg Id: C0A8006D5AB018B4AAC216E0DB690000


Body: This is a delay message.

延迟消息的原理分析

以下分析的 RocketMQ 源码的版本号是 4.7.1,版本不同源码略有差别。

CommitLog

org.apache.rocketmq.store.CommitLog 中,针对延迟消息做了一些处理:


// 延迟级别大于 0,就是延时消息


if (msg.getDelayTimeLevel() > 0) {


// 判断当前延迟级别,如果大于最大延迟级别,


// 就设置当前延迟级别为最大延迟级别。


if (msg.getDelayTimeLevel() > this.defaultMessageStore


.getScheduleMessageService().getMaxDelayLevel()) {


msg.setDelayTimeLevel(this.defaultMessageStore


.getScheduleMessageService().getMaxDelayLevel());


}


// 获取延迟消息的主题,


// 其中 RMQ_SYS_SCHEDULE_TOPIC 的值为 SCHEDULE_TOPIC_XXXX


topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;


// 根据延迟级别获取延迟消息的队列 Id,


// 队列 Id 其实就是延迟级别减 1


queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());


// 备份真正的主题和队列 Id


MessageAccessor.putProperty(msg


, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());


MessageAccessor.putProperty(msg


, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));


msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));


// 设置延时消息的主题和队列 Id


msg.setTopic(topic);


msg.setQueueId(queueId);


}


可以看到,每一个延迟消息的主题都被暂时更改为 SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列 Id。接下来,处理延迟消息的就是 org.apache.rocketmq.store.schedule.ScheduleMessageService

ScheduleMessageService

ScheduleMessageService 是由 org.apache.rocketmq.store.DefaultMessageStore 进行初始化的,初始化包括构造对象和调用load方法。最后,再执行 ScheduleMessageService 的start方法:


public void start() {


// 使用 AtomicBoolean 确保 start 方法仅有效执行一次


if (started.compareAndSet(false, true)) {


this.timer = new Timer("ScheduleMessageTimerThread", true);


// 遍历所有延迟级别


for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {


// key 为延迟级别


Integer level = entry.getKey();


// value 为延迟级别对应的毫秒数


Long timeDelay = entry.getValue();


// 根据延迟级别获得对应队列的偏移量


Long offset = this.offsetTable.get(level);


// 如果偏移量为 null,则设置为 0


if (null == offset) {


offset = 0L;


}


if (timeDelay != null) {


// 为每个延迟级别创建定时任务,


// 第一次启动任务延迟为 FIRST_DELAY_TIME,也就是 1 秒


this.timer.schedule(


new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);


}


}


// 延迟 10 秒后每隔 flushDelayOffsetInterval 执行一次任务,


// 其中,flushDelayOffsetInterval 默认配置也为 10 秒


this.timer.scheduleAtFixedRate(new TimerTask() {


@Override


public void run() {


try {


// 持久化每个队列消费的偏移量


if (started.get()) ScheduleMessageService.this.persist();


} catch (Throwable e) {


log.error("scheduleAtFixedRate flush exception", e);


}


}


}, 10000, this.defaultMessageStore


.getMessageStoreConfig().getFlushDelayOffsetInterval());


}


}


遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为 0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为 1 秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。


然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由 flushDelayOffsetInterval 属性进行配置,默认为 10 秒。

定时任务

ScheduleMessageService 的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在 DeliverDelayedMessageTimerTask 类之中,它核心代码是 executeOnTimeup 方法之中,我们来看一下主要部分:


// 根据主题和队列 Id 获取消息队列


ConsumeQueue cq =


ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(


TopicValidator.RMQ_SYS_SCHEDULE_TOPIC


, delayLevel2QueueId(delayLevel));


如果没有获取到对应的消息队列,则在 DELAY_FOR_A_WHILE(默认为 100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:


// 根据消费偏移量从消息队列中获取所有有效消息

总结

虽然我个人也经常自嘲,十年之后要去成为外卖专员,但实际上依靠自身的努力,是能够减少三十五岁之后的焦虑的,毕竟好的架构师并不多。


架构师,是我们大部分技术人的职业目标,一名好的架构师来源于机遇(公司)、个人努力(吃得苦、肯钻研)、天分(真的热爱)的三者协作的结果,实践+机遇+努力才能助你成为优秀的架构师。


如果你也想成为一名好的架构师,那或许这份 Java 成长笔记你需要阅读阅读,希望能够对你的职业发展有所帮助。



本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

用户头像

MySQL神话

关注

还未添加个人签名 2021.11.12 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ延迟消息的代码实战及原理分析,实战解析