写点什么

【深度挖掘 RocketMQ 底层源码】「底层源码挖掘系列」透彻剖析贯穿 RocketMQ 的消费者端的运行调度的流程(Pull 模式)

作者:洛神灬殇
  • 2023-02-23
    江苏
  • 本文字数:3585 字

    阅读完需:约 12 分钟

【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)

承接上文

承接上一章节的内容,下面我们看继续看拉取的调度模式,PULL 与 PUSH 模式相比,PULL 模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了 Pull 方式的编程复杂度,RocketMQ 提供了调度消费服务(MQPullConsumerScheduleService),在 topic 的订阅发送变化(初次订阅或距上次拉取消息超时)就触发 PULL 方式拉取消息。

MQPullConsumerScheduleService

MQPullConsumerScheduleService 是 PULL 模式下面的调度服务,当 RebalanceImpl.processQueueTable 队列有变化时才进行消息的拉取,从而降低 Pull 方式的编程复杂度。在应用层按照如下方式使用:

使用 MQPullConsumerScheduleService 开发消费消息

实例化对象 MQPullConsumerScheduleService

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
复制代码

设置 NameServer

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
复制代码

设置消费组为集群模式

scheduleService.setMessageModel(MessageModel.CLUSTERING);
复制代码

注册拉取回调函数

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {            @Override            public void doPullTask(MessageQueue mq, PullTaskContext context) {                MQPullConsumer consumer = context.getPullConsumer();                try {                    long offset = consumer.fetchConsumeOffset(mq, false);                    if (offset < 0)                        offset = 0;                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);                    switch (pullResult.getPullStatus()) {                        case FOUND:                            break;                        case NO_MATCHED_MSG:                            break;                        case NO_NEW_MSG:                        case OFFSET_ILLEGAL:                            break;                        default:                            break;                    }                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());                    context.setPullNextDelayTimeMillis(100);                } catch (Exception e) {                    e.printStackTrace();                }            }        });
复制代码


从上下文中获取 MQPullConsumer 对象,此处其实就是 DefaultMQPullConsumer。


MQPullConsumer consumer = context.getPullConsumer();
复制代码


获取该消费组的该队列的消费进度


long offset = consumer.fetchConsumeOffset(mq, false);
复制代码


拉取消息,pull()方法在 DefaultMQPullConsumer 有具体介绍


PullResult pullResult = consumer.pull(mq, "*", offset, 32);
复制代码


更新消费组该队列消费进度


consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
复制代码


设置下次拉取消息时间间隔,单位毫秒


context.setPullNextDelayTimeMillis(100);
复制代码


启动调度组件,调用 MQPullConsumerScheduleService.start()方法启动该调度服务。


scheduleService.start();
复制代码


  1. 首先初始化队列监听器 MessageQueueListenerImpl 类,该类是 MQPullConsumerScheduleService 的内部类,实现了 MessageQueueListener 接口的 messageQueueChanged 方法;

  2. 将该监听器类赋值给 DefaultMQPullConsumer.messageQueueListener 变量值;

  3. 调用 DefaultMQPullConsumer 的 start 方法启动 Consumer;

分析核心执行方法及流程


  1. 使用 registerPullTaskCallback 对 Topic 进行注册

  2. MQPullConsumerScheduleService 会将 Topic 的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的 Hash 表中。

  3. 线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。

  4. 在 doPullTask() 完成自己的拉取消息逻辑,和 DefaultMQPullConsumer 是一样的。

  5. 用户设置下次调用间隔时间

  6. scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。

注册拉取任务回调函数

/** * @param topic topic名称 * @param callback 回调函数 */public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {    this.callbackTable.put(topic, callback);    this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);}
复制代码

PullTaskCallback 回调函数接口

调用 MQPullConsumerScheduleService.registerPullTaskCallback (String topic, PullTaskCallback callback)方法,在该方法中以 topic 为 key 值将自定义的 PullTaskCallback 对象存入 MQPullConsumerScheduleService. callbackTable:ConcurrentHashMap<String ,PullTaskCallback>变量中;


public interface PullTaskCallback {    /**     *      * @param mq 消息队列     * @param context 任务上下文     */    void doPullTask(final MessageQueue mq, final PullTaskContext context);}
复制代码


建立 PullTaskCallback 接口的实现类,实现该接口的 doPullTask(final MessageQueue mq, final PullTaskContext context)方法。


在该方法中可以先调用 DefaultMQPullConsumer.fetchConsumeOffset (MessageQueue mq, boolean fromStore)方法获取 MessageQueue 队列的消费进度

PullTaskContext 拉取任务上下文

调用 DefaultMQPullConsumer.pull(MessageQueue mq, String subExpression, long offset, int maxNums)方法,


  1. 指定的队列和指定的开始位置读取消息内容;

  2. 获取到的消息进行相关的业务逻辑处理;


public class PullTaskContext {
private int pullNextDelayTimeMillis = 200;
// 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumer private MQPullConsumer pullConsumer;
public int getPullNextDelayTimeMillis() { return pullNextDelayTimeMillis; } /** * 设置下次调用doPullTask()的间隔时间,默认毫秒 */ public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) { this.pullNextDelayTimeMillis = pullNextDelayTimeMillis; } public MQPullConsumer getPullConsumer() { return pullConsumer; }
public void setPullConsumer(MQPullConsumer pullConsumer) { this.pullConsumer = pullConsumer; }}
复制代码


  1. 调用 DefaultMQPullConsumer.updateConsumeOffset(MessageQueue mq, long offset)方法进行消费进度的更新,其中 offset 值是在获取消息内容时返回的下一个消费进度值;

MQPullConsumerScheduleService 的实现原理

触发拉取消息

RebalanceImpl.rebalanceByTopic()方法执行的过程中,若 RebalanceImpl.processQueueTable 有变化,则回调 DefaultMQPullConsumer. messageQueueListener 变量值的 MessageQueueListenerImpl. MessageQueueChanged 方法,在该方法中调用 MQPullConsumerScheduleService. putTask(String topic, Set<MessageQueue> mqNewSet)方法。


  • 若为广播模式(BROADCASTING),则 mqNewSet 为该 topic 下面的所有 MessageQueue 队列;

  • 若为集群模式,则 mqNewSet 为给该 topic 分配的 MessageQueue 队列,putTask 方法的大致逻辑如下:

  • 遍历MQPullConsumerScheduleService.taskTable: ConcurrentHashMap<MessageQueue, PullTaskImpl> 列表(表示正在拉取消息的任务列表),检查该 topic 下面的所有 MessageQueue 对象,若该对象不在入参 mqNewSet 集合中的,将对应的 PullTaskImpl 对象的 cancelled 变量标记为 true。

  • mqNewSet 集合中的 MessageQueue 对象,若不在 MQPullConsumerScheduleService.taskTable 列表中,则以 MessageQueue 对象为参数初始化 PullTaskImpl 对象,然后放入 taskTable 列表中,将该 PullTaskImpl 对象放入MQPullConsumerScheduleService.scheduledThreadPoolExecutor线程池中,然后立即执行该线程。

拉取消息的线程(PullTaskImpl)

该 PullTaskImpl 线程的 run 方法如下:


  1. 检查 cancelled 变量是为 true,若为 false 则直接退出该线程;否则继续下面的处理;

  2. 以 MessageQueue 对象的 topic 值从 MQPullConsumerScheduleService.callbackTable 变量中获取 PullTaskCallback 的实现类(该类是由应用层实现);


3, 调用该 PullTaskCallback 实现类的 doPullTask 方法,即实现业务层定义的业务逻辑(通用逻辑是先获取消息内容,然后进行相应的业务处理,最后更新消费进度);


4, 再次检查 cancelled 变量是为 true,若不为 true,则将该 PullTaskImpl 对象再次放入 MQPullConsumerScheduleService. scheduledThreadPoolExecutor 线程池中,设定在 200 毫秒之后重新调度执行 PullTaskImpl 线程类;

用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)_RocketMQ_洛神灬殇_InfoQ写作社区