写点什么

【深度挖掘 RocketMQ 底层源码】「底层源码挖掘系列」抽丝剥茧贯穿 RocketMQ 的消费者端的运行核心的流程(Pull 模式 - 下)

作者:洛神灬殇
  • 2023-02-22
    江苏
  • 本文字数:8424 字

    阅读完需:约 28 分钟

【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

承接【【深度挖掘 RocketMQ 底层源码】「底层源码挖掘系列」透彻剖析贯穿 RocketMQ 的消费者端的运行核心的流程(Pull 模式-上)】

pullBlockIfNotFound 方法

通过该方法获取该 MessageQueue 队列下面从 offset 位置开始的消息内容,其中 maxNums=32 即表示获取的最大消息个数,offset 为该 MessageQueue 对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取该 MessageQueue 队列的消费进度来设定参数 offset 值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法。


public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());}
复制代码


接下面我来分析DefaultMQPullConsumerImpl.pullSyncImpl源码实现如下:

pullSyncImpl 方法的定义

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
复制代码
参数说明
  • MessageQueue mq:需要进行拉去的消息队列。

  • String subExpression:tag 的标签值

  • long offset:消息数据偏移量

  • int maxNums:拉去的消息的最大数量

  • boolean block:是否进行阻塞拉去

  • long timeout:拉取数据的超时时间

参数数据校验

如下所示,校验对应的 MessageQueue 对象、offset 偏移量和拉取的最大数据量是否合法。


        this.makeSureStateOK();        if (null == mq) {            throw new MQClientException("mq is null", null);        }        if (offset < 0) {            throw new MQClientException("offset < 0", null);        }        if (maxNums <= 0) {            throw new MQClientException("maxNums <= 0", null);        }
复制代码
检查 MessageQueue 对象的 topic 是否存在

检查 MessageQueue 对象的 topic 是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData> 变量中,若不在则以 consumerGroup、topic、subExpression 为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression) 方法构造 SubscriptionData 对象保存到 RebalanceImpl.subscriptionInner 变量中,其中 subExpression="*"。


 this.subscriptionAutomatically(mq.getTopic());
复制代码
构建标志位,逻辑或运算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
复制代码
构建 SubscriptionData 对象(buildSubscriptionData)

请求参数 subExpression 以及 consumerGroup、topic 为参数调用 FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造 SubscriptionData 对象并返回。


SubscriptionData subscriptionData;try {            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),                mq.getTopic(), subExpression);        } catch (Exception e) {            throw new MQClientException("parse subscription error", e);、}
复制代码
从 broker 中拉取消息(pullAPIWrapper.pullKernelImpl)

调用 PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法从 Broker 拉取消息内容。


 long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(            mq,            subscriptionData.getSubString(),            0L,            offset,            maxNums,            sysFlag,            0,            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),            timeoutMillis,            CommunicationMode.SYNC,            null        );
复制代码


对拉取到的消息进行解码,过滤并执行回调,并把解析的 message 列表放到 MsgFoundList 中。


pullKernelImpl 底层调用机制


调用 PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法对拉取消息的响应结果进行处理,主要是消息反序列化。


public PullResult pullKernelImpl(        final MessageQueue mq,        final String subExpression,        final String expressionType,        final long subVersion,        final long offset,        final int maxNums,        final int sysFlag,        final long commitOffset,        final long brokerSuspendMaxTimeMillis,        final long timeoutMillis,        final CommunicationMode communicationMode,        final PullCallback pullCallback    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        FindBrokerResult findBrokerResult =            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                this.recalculatePullFromWhichNode(mq), false);        if (null == findBrokerResult) {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());            findBrokerResult =                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                    this.recalculatePullFromWhichNode(mq), false);        }        if (findBrokerResult != null) {            {                // check version                if (!ExpressionType.isTagType(expressionType)                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);                }            }            int sysFlagInner = sysFlag;            if (findBrokerResult.isSlave()) {                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);            }            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();            requestHeader.setConsumerGroup(this.consumerGroup);            requestHeader.setTopic(mq.getTopic());            requestHeader.setQueueId(mq.getQueueId());            requestHeader.setQueueOffset(offset);            requestHeader.setMaxMsgNums(maxNums);            requestHeader.setSysFlag(sysFlagInner);            requestHeader.setCommitOffset(commitOffset);            // 设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);            requestHeader.setSubscription(subExpression);            requestHeader.setSubVersion(subVersion);            requestHeader.setExpressionType(expressionType);            String brokerAddr = findBrokerResult.getBrokerAddr();            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);            }            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(                brokerAddr,                requestHeader,                timeoutMillis,                communicationMode,                pullCallback);            return pullResult;        }        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }
复制代码


  • 看到首先代码调用MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) 方法查找 Broker 地址,其中 onlyThisBroker=false,表示若指定的 brokerId 未获取到地址则获取其他 BrokerId 的地址也行。

  • 方法中根据 brokerName 和 brokerId 参数从 MQClientInstance.brokerAddrTable->ConcurrentHashMap<, HashMap 变量中获取对应的 Broker 地址,若获取不到则从 brokerName 下面的 Map 列表中找其他地址返回即可。

  • 判断是否为空,若在上一步未获取到 Broker 地址,topic 参数调用MQClientInstance.updateTopicRouteInfoFromNameServer(String topic)方法,然后在执行操作MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) ,直到获取到 Broker 地址为止;

  • 根据 topic 参数值从 MQClientInstance.topicRouteTable: ConcurrentHashMapTopicRouteData>变量中获取 TopicRouteData 对象,

  • Broker 地址为参数从该 TopicRouteData 对象的 filterServerTable:HashMap 变量中获取该 Broker 下面的所有 Filter 服务器地址列表;

  • 若该地址列表不为空,则随机选择一个 Filter 服务器地址返回;否则向调用层抛出异常,该 pullKernelImpl 方法结束;


获取对应的 broker 服务


若获取的 Broker 地址是备用 Broker,则将标记位 sysFlag 的第 1 个字节置为 0,即在消费完之后不提交消费进度;


检查标记位 sysFlag 的第 4 个字节(即 SubscriptionData. classFilterMode)是否为 1;若等于 1,则调用 PullAPIWrapper.computPullFromWhichFilterServer(String topic, String brokerAddr)方法获取 Filter 服务器地址。大致逻辑如下:


 int sysFlagInner = sysFlag;            if (findBrokerResult.isSlave()) {                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}
复制代码


初始化 PullMessageRequestHeader 对象


调用MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法向 Broker 地址或者 Filter 地址发送 PULL_MESSAGE 请求信息。


构建 PullMessageRequestHeader 对象,其中 queueOffset 变量值等于入参 offset,设置 broker 的最长阻塞时间,默认是 15 秒,broker 只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回 null,如果有消息会立即返回。


            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();            requestHeader.setConsumerGroup(this.consumerGroup);            requestHeader.setTopic(mq.getTopic());            requestHeader.setQueueId(mq.getQueueId());            requestHeader.setQueueOffset(offset);            requestHeader.setMaxMsgNums(maxNums);            requestHeader.setSysFlag(sysFlagInner);            requestHeader.setCommitOffset(commitOffset);            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);            requestHeader.setSubscription(subExpression);            requestHeader.setSubVersion(subVersion);            requestHeader.setExpressionType(expressionType);
复制代码


获取的 Filter 服务器发送 PULL_MESSAGE 请求信息,否则向 Broker 发送 PULL_MESSAGE 请求信息。


pullMessage 方法处理


 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(                brokerAddr,                requestHeader,                timeoutMillis,                communicationMode,                pullCallback);  return pullResult;
复制代码


代码实现机制(同步+异步)


在 MQClientAPIImpl.pullMessage 方法中,根据入参 communicationMode 的值分为异步拉取和同步拉取方式两种。


public PullResult pullMessage(        final String addr,        final PullMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final PullCallback pullCallback    ) throws RemotingException, MQBrokerException, InterruptedException {        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);        switch (communicationMode) {            case ONEWAY:                assert false;                return null;            case ASYNC:                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);                return null;            case SYNC:                return this.pullMessageSync(addr, request, timeoutMillis);            default:                assert false;                break;        }        return null;    }
复制代码


无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个 ResponseFuture 对象,以请求消息的序列号为 key 值,存入 NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:


  1. 发送失败后直接删掉 responseTable 变量中的相应记录;

  2. 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从 responseTable 中查找 ResponseFuture 对象,并设置该对象的 responseCommand 变量。若是同步发送会唤醒等待响应的 ResponseFuture.waitResponse 方法;若是异步发送会调用 ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;


pullMessageSync 调用操作(对于同步拉取)


同步拉取方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法。大致步骤如下:


  1. 调用 RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:


 @Override    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {        long beginStartTime = System.currentTimeMillis();        final Channel channel = this.getAndCreateChannel(addr);        if (channel != null && channel.isActive()) {            try {                if (this.rpcHook != null) {                    this.rpcHook.doBeforeRequest(addr, request);                }                long costTime = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTime) {                    throw new RemotingTimeoutException("invokeSync call timeout");                }                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);                if (this.rpcHook != null) {                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);                }                return response;            } catch (RemotingSendRequestException e) {                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);                this.closeChannel(addr, channel);                throw e;            } catch (RemotingTimeoutException e) {                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {                    this.closeChannel(addr, channel);                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);                }                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);                throw e;            }        } else {            this.closeChannel(addr, channel);            throw new RemotingConnectException(addr);        }    }
复制代码


  • 获取 Broker 地址的 Channel 信息。根据 broker 地址从 RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取 ChannelWrapper 对象并返回该对象的 Channel 变量;若没有 ChannelWrapper 对象则与 broker 地址建立新的连接并将连接信息存入 channelTables 变量中,便于下次使用;

  • 若 NettyRemotingClient.rpcHook:RPCHook 变量不为空(该变量在应用层初始化 DefaultMQPushConsumer 或者 DefaultMQPullConsumer 对象传入该值),则调用 RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;


NettyRemotingAbstract.invokeSyncImpl(同步请求)


调用 NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:


  1. 使用请求的序列号(opaue)、超时时间初始化 ResponseFuture 对象;并将该 ResponseFuture 对象存入 NettyRemotingAbstract.responseTable: ConcurrentHashMap 变量中;

  2. 调用 Channel.writeAndFlush(Object msg)方法将请求对象 RemotingCommand 发送给 Broker;然后调用 addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了 ChannelFutureListener 接口的 operationComplete 方法,在发送完成之后回调该监听类的 operationComplete 方法,在该方法中,首先调用 ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置 ResponseFuture 对象的 sendRequestOK 等于 true 并退出此回调方法等待响应结果;若不成功则置 ResponseFuture 对象的 sendRequestOK 等于 false,然后从 NettyRemotingAbstract.responseTable 中删除此请求序列号(opaue)的记录,置 ResponseFuture 对象的 responseCommand 等于 null,并唤醒 ResponseFuture.waitResponse(long timeoutMillis)方法的等待;

  3. 调用 ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息或者超时的情况下会唤醒该方法返回 ResponseFuture.responseCommand 变量值;

  4. 若上一步返回的 responseCommand 值为 null,则抛出异常:若 ResponseFuture.sendRequestOK 为 true,则抛出 RemotingTimeoutException 异常,否则抛出 RemotingSendRequestException 异常;

  5. 若上一步返回的 responseCommand 值不为 null,则返回 responseCommand 变量值;


结果返回


  1. 若 NettyRemotingClient.rpcHook: RPCHook 变量不为空,则调用 RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;


2、以上一步的返回值 RemotingCommand 对象为参数调用 MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成 PullResultExt 对象然后返回给调用者,响应消息的结果状态转换如下:


  • 若 RemotingCommand 对象的 Code 等于 SUCCESS,则 PullResultExt.pullStatus=FOUND;

  • 若 RemotingCommand 对象的 Code 等于 PULL_NOT_FOUND,则 PullResultExt.pullStatus= NO_NEW_MSG;

  • 若 RemotingCommand 对象的 Code 等于 PULL_RETRY_IMMEDIATELY,则 PullResultExt.pullStatus= NO_MATCHED_MSG;

  • 若 RemotingCommand 对象的 Code 等于 PULL_OFFSET_MOVED,则 PullResultExt.pullStatus= OFFSET_ILLEGAL;


定时请求响应结果列表


在 NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔 1 秒定期扫描 responseTable 列表,遍历该列表中的 ResponseFuture 对象,检查等待响应是否超时,若超时,则调用 ResponseFuture. executeInvokeCallback()方法,并将该对象从 responseTable 列表中删除;

用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)_RocketMQ_洛神灬殇_InfoQ写作社区