写点什么

【深度挖掘 RocketMQ 底层源码】「底层源码挖掘系列」透彻剖析贯穿 RocketMQ 的消费者端的运行核心的流程(Pull 模式 - 上)

作者:洛神灬殇
  • 2023-02-21
    江苏
  • 本文字数:5310 字

    阅读完需:约 17 分钟

【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。我们接下来主要介绍 Pull 模式

Pull 模式的处理机制

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程


在 RocketMQ 中有两种 Pull 方式,一种是比较原始 Pull Consumer,它不提供相关的订阅方法,需要调用 pull 方法时指定队列进行拉取,并需要自己更新位点。另一种是 Lite Pull Consumer,它提供了 Subscribe 和 Assign 两种方式,使用起来更加方便。

Pull 模式的使用特点

  • 开发者自己维护 OffsetStore。

  • 自己保存消费组的 offset,比如存入 Redis,或调用 MQ 接口将其保存在 Broker 端。自主选择 Message Queue 和 offset 进行消息拉取。

  • 用户拉去消息时,需要用户自己来决定拉去哪个队列从哪个 offset 开始,拉去多少消息。

相比 Push 的运行特点

与 PUSH 模式相比,PULL 模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了 Pull 方式的编程复杂度,RocketMQ 提供了调度消费服务(MQPullConsumerScheduleService),在 topic 的订阅发送变化(初次订阅或距上次拉取消息超时)就触发 PULL 方式拉取消息。

DefaultMQPullConsumer

针对于 DefaultMQPullConsumer 源码流程进行相关的分析,对于 Push 模式而言,Pull 模式比较适应于客户端拉去的速度由自己进行控制处理。而且实现的原理和复杂程度也简单了很多,我们从实现出发,进行分析对应的实现流程。

DefaultMQPullConsumer 的 Pull 拉取模式的开发案例

指定队列模式消费对应队列的消息

    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");    consumer.setNamesrvAddr("127.0.0.1:9876");    consumer.start();    try {      MessageQueue mq = new MessageQueue();      mq.setQueueId(0);      mq.setTopic("lob");      mq.setBrokerName("brokerName");      long offset = 26;      PullResult pullResult = consumer.pull(mq, "*", offset, 32);      if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {        System.out.printf("%s%n", pullResult.getMsgFoundList());        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());      }    } catch (Exception e) {      e.printStackTrace();    }    consumer.shutdown();  }
复制代码

消费所有队列数据

从所有队列进行选择队列模式,并且存储 offset 在被本地。


DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");for(MessageQueue mq:mqs){    try {   // 获取消息的offset,指定从store中获取   long offset = consumer.fetchConsumeOffset(mq,true);    while(true){       PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);       putMessageQueueOffset(mq,pullResult.getNextBeginOffset());      switch(pullResult.getPullStatus()){         case FOUND:                  List<MessageExt> messageExtList = pullResult.getMsgFoundList();                  for (MessageExt m : messageExtList) {                      System.out.println(new String(m.getBody()));                   }                break;        case NO_MATCHED_MSG:                break;        case NO_NEW_MSG:                break;        case OFFSET_ILLEGAL:                break;      }  }} catch (Exception e) {     e.printStackTrace(); }}consumer.shutdown();
// 保存上次消费的消息下标private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset);}
// 获取上次消费的消息的下标private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l;}
复制代码
fetchSubscribeMessageQueues(从指定 topic 中拉取所有消息队列)

根据 Topic 获取该 Topic 的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息,


调用 DefaultMQPullConsumer.fetchSubscribeMessageQueues(String topic)方法,根据 topic 获取对应的 MessageQueue(即可被订阅的队列),在该方法中最终通过调用 MQAdminImpl.fetchSubscribeMessageQueues(String topic)方法从 NameServer 获取该 topic 的 MessageQueue。


/** * @param topic Topic名称 * @return 该Topic所有的消息队列 */@Overridepublic Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {    return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));}
复制代码
fetchSubscribeMessageQueues 底层调用

调用 MQClientAPIImpl.getTopicRouteInfoFromNameServer(String topic, long timeoutMillis)方法,其中 timeoutMillis=3000,该方法向 NameServer 发送 GET_ROUTEINTO_BY_TOPIC 请求码获取 topic 参数对应的 Broker 信息和 topic 配置信息,即 TopicRouteData 对象;.


 public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {        try {            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);            if (topicRouteData != null) {                // 2、遍历topicRouteData                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);                if (!mqList.isEmpty()) {                    return mqList;                } else {                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);                }            }        } catch (Exception e) {            throw new MQClientException(                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),                e);        }         throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);    }
复制代码
fetchSubscribeMessageQueues 底层调用

遍历 TopicRouteData 对象的 QueueData 列表中每个 QueueData 对象,首先判断该 QueueData 对象是否具有读权限,若有则根据该 QueueData 对象的 readQueueNums 值,创建 readQueueNums 个 MessageQueue 对象,并构成 MessageQueue 集合;最后返回给 MessageQueue 集合


public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {        Set<MessageQueue> mqList = new HashSet<MessageQueue>();        List<QueueData> qds = route.getQueueDatas();        for (QueueData qd : qds) {            if (PermName.isReadable(qd.getPerm())) {                for (int i = 0; i < qd.getReadQueueNums(); i++) {                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                    mqList.add(mq);                }            }        }        return mqList;    }
复制代码
消息的三种拉取模式

同步拉取消息


/** * @param mq            消息队列 * @param subExpression 消息tag过滤表达式 * @param offset        消费组offset(从哪里开始拉去) * @param maxNums       一次最大拉去消息数量 * @param timeout       超时时间 * @return 存储了拉取状态以及消息 */@Overridepublic PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);}
复制代码


异步拉取消息


/** * @param mq            消息队列 * @param subExpression 消息tag过滤表达式 * @param offset        消费组offset(从哪里开始拉去) * @param maxNums       一次最大拉去消息数量 * @param timeout       超时时间 * @param pullCallback  异步回调函数 * @param timeout        * @throws MQClientException * @throws RemotingException * @throws InterruptedException */@Overridepublic void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,                 long timeout)        throws MQClientException, RemotingException, InterruptedException {    this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);}
复制代码


同步阻塞拉取消息


拉取消息,若没有找到消息,则阻塞一段时间。通过该方法获取该 MessageQueue 队列下面从 offset 位置开始的消息内容。


  • maxNums=32 即表示获取的最大消息个数。

  • offset 为该 MessageQueue 对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取该 MessageQueue 队列的消费进度来设定参数 offset 值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法


/** * @param mq            消息队列 * @param subExpression tag过滤 * @param offset        消费组offset * @param maxNums       一次最大拉取数量 * @return */@Overridepublic PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);}
复制代码


pullBlockIfNotFound 和 pull 区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND 。

维护消息队列的 Offset

获取队列的消费 Offset


/** * @param mq 队列 * @param fromStore 是否从存储获取,true: 从当前服务器存储中获取,false:从远程broker获取 * @return 消费offset */@Overridepublic long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {    return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);}
复制代码


调用 DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取 MessageQueue 队列的消费进度,其中 fromStore 为 false 表示从存储端(即 Broker 端)获取消费进度;若 fromStore 为 true 表示从本地内存获取消费进度;


  1. 对于从存储端获取消费进度(即 fromStore=true)的情况:


  • 对于 LocalFileOffsetStore 对象,从本地加载 offsets.json 文件,然后获取该 MessageQueue 对象的 offset 值;


(即 fromStore=false)对于 RemoteBrokerOffsetStore 对象,获取逻辑如下:


  1. 以 MessageQueue 对象的 brokername 从 MQClientInstance. brokerAddrTable 中获取 Broker 的地址;若没有获取到则立即调用 updateTopicRouteInfoFromNameServer 方法然后再次获取;

  2. 构造 QueryConsumerOffsetRequestHeader 对象,其中包括 topic、consumerGroup、queueId;然后调用 MQClientAPIImpl.queryConsumerOffset (String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis)方法向 Broker 发送 QUERY_CONSUMER_OFFSET 请求码,获取消费进度 Offset;

  3. 用上一步从 Broker 获取的 offset 更新本地内存的消费进度列表数据 RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>变量值;


更新消费组 Offset


更新消费组的 Offset,注意:只会在本地内存中更新,并不会同步到远程 Broker.


/** * @param mq 消息队列 * @param offset 消费进度 */@Overridepublic void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {    this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);}
复制代码


发布于: 2023-02-21阅读数: 30
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)_Apache RocketMQ_洛神灬殇_InfoQ写作社区