【深度挖掘 RocketMQ 底层源码】「底层源码挖掘系列」透彻剖析贯穿 RocketMQ 的消费者端的运行调度的流程(Pull 模式)
承接上文
承接上一章节的内容,下面我们看继续看拉取的调度模式,PULL 与 PUSH 模式相比,PULL 模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了 Pull 方式的编程复杂度,RocketMQ 提供了调度消费服务(MQPullConsumerScheduleService),在 topic 的订阅发送变化(初次订阅或距上次拉取消息超时)就触发 PULL 方式拉取消息。
MQPullConsumerScheduleService
MQPullConsumerScheduleService 是 PULL 模式下面的调度服务,当 RebalanceImpl.processQueueTable 队列有变化时才进行消息的拉取,从而降低 Pull 方式的编程复杂度。在应用层按照如下方式使用:
使用 MQPullConsumerScheduleService 开发消费消息
实例化对象 MQPullConsumerScheduleService
设置 NameServer
设置消费组为集群模式
注册拉取回调函数
从上下文中获取 MQPullConsumer 对象,此处其实就是 DefaultMQPullConsumer。
获取该消费组的该队列的消费进度
拉取消息,pull()方法在 DefaultMQPullConsumer 有具体介绍
更新消费组该队列消费进度
设置下次拉取消息时间间隔,单位毫秒
启动调度组件,调用 MQPullConsumerScheduleService.start()方法启动该调度服务。
首先初始化队列监听器 MessageQueueListenerImpl 类,该类是 MQPullConsumerScheduleService 的内部类,实现了 MessageQueueListener 接口的 messageQueueChanged 方法;
将该监听器类赋值给 DefaultMQPullConsumer.messageQueueListener 变量值;
调用 DefaultMQPullConsumer 的 start 方法启动 Consumer;
分析核心执行方法及流程
使用 registerPullTaskCallback 对 Topic 进行注册
MQPullConsumerScheduleService 会将 Topic 的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的 Hash 表中。
线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
在 doPullTask() 完成自己的拉取消息逻辑,和 DefaultMQPullConsumer 是一样的。
用户设置下次调用间隔时间
scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。
注册拉取任务回调函数
PullTaskCallback 回调函数接口
调用 MQPullConsumerScheduleService.registerPullTaskCallback (String topic, PullTaskCallback callback)方法,在该方法中以 topic 为 key 值将自定义的 PullTaskCallback 对象存入 MQPullConsumerScheduleService. callbackTable:ConcurrentHashMap<String ,PullTaskCallback>变量中;
建立 PullTaskCallback 接口的实现类,实现该接口的 doPullTask(final MessageQueue mq, final PullTaskContext context)方法。
在该方法中可以先调用 DefaultMQPullConsumer.fetchConsumeOffset (MessageQueue mq, boolean fromStore)方法获取 MessageQueue 队列的消费进度。
PullTaskContext 拉取任务上下文
调用 DefaultMQPullConsumer.pull(MessageQueue mq, String subExpression, long offset, int maxNums)方法,
指定的队列和指定的开始位置读取消息内容;
获取到的消息进行相关的业务逻辑处理;
调用 DefaultMQPullConsumer.updateConsumeOffset(MessageQueue mq, long offset)方法进行消费进度的更新,其中 offset 值是在获取消息内容时返回的下一个消费进度值;
MQPullConsumerScheduleService 的实现原理
触发拉取消息
RebalanceImpl.rebalanceByTopic()方法执行的过程中,若 RebalanceImpl.processQueueTable 有变化,则回调 DefaultMQPullConsumer. messageQueueListener 变量值的 MessageQueueListenerImpl. MessageQueueChanged 方法,在该方法中调用 MQPullConsumerScheduleService. putTask(String topic, Set<MessageQueue> mqNewSet)方法。
若为广播模式(BROADCASTING),则 mqNewSet 为该 topic 下面的所有 MessageQueue 队列;
若为集群模式,则 mqNewSet 为给该 topic 分配的 MessageQueue 队列,putTask 方法的大致逻辑如下:
遍历
MQPullConsumerScheduleService.taskTable: ConcurrentHashMap<MessageQueue, PullTaskImpl>
列表(表示正在拉取消息的任务列表),检查该 topic 下面的所有 MessageQueue 对象,若该对象不在入参 mqNewSet 集合中的,将对应的 PullTaskImpl 对象的 cancelled 变量标记为 true。mqNewSet 集合中的 MessageQueue 对象,若不在 MQPullConsumerScheduleService.taskTable 列表中,则以 MessageQueue 对象为参数初始化 PullTaskImpl 对象,然后放入 taskTable 列表中,将该 PullTaskImpl 对象放入
MQPullConsumerScheduleService.scheduledThreadPoolExecutor
线程池中,然后立即执行该线程。
拉取消息的线程(PullTaskImpl)
该 PullTaskImpl 线程的 run 方法如下:
检查 cancelled 变量是为 true,若为 false 则直接退出该线程;否则继续下面的处理;
以 MessageQueue 对象的 topic 值从 MQPullConsumerScheduleService.callbackTable 变量中获取 PullTaskCallback 的实现类(该类是由应用层实现);
3, 调用该 PullTaskCallback 实现类的 doPullTask 方法,即实现业务层定义的业务逻辑(通用逻辑是先获取消息内容,然后进行相应的业务处理,最后更新消费进度);
4, 再次检查 cancelled 变量是为 true,若不为 true,则将该 PullTaskImpl 对象再次放入 MQPullConsumerScheduleService. scheduledThreadPoolExecutor 线程池中,设定在 200 毫秒之后重新调度执行 PullTaskImpl 线程类;
评论