写点什么

RocketMQ 源码分析之消息拉取流程

发布于: 2021 年 01 月 16 日

在《RocketMQ 源码分析之 RebalanceService》中回答了消费者在第一次启动后是如何来获取消息这个问题,那么在构建 PullRequest(消息拉取任务)后,消费者与 broker 之间是如何交互来完成消息拉取任务?本篇文章就来分析消息拉取流程。在 consumer 端与消息拉取流程相关的服务主要是 RebalanceService 和 PullMessageService,RebalanceService 主要负责 consumer 端消息队列负载均衡及构建 PullRequest,PullMessageService 主要负责 consumer 端消息拉取。下面从 PullMessageService 入手来分析。


  PullMessageService 是在 consumer 启动过程中启动 MQClientInstance 实例时启动的,具体如下:


public void start() throws MQClientException {
    synchronized (this) {        switch (this.serviceState) {            case CREATE_JUST:                this.serviceState = ServiceState.START_FAILED;                // If not specified,looking address from name server                if (null == this.clientConfig.getNamesrvAddr()) {                    this.mQClientAPIImpl.fetchNameServerAddr();                }                // Start request-response channel                this.mQClientAPIImpl.start();                // Start various schedule tasks                this.startScheduledTask();                // Start pull service                this.pullMessageService.start();                // Start rebalance service                this.rebalanceService.start();                // Start push service                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                log.info("the client factory [{}] start OK", this.clientId);                this.serviceState = ServiceState.RUNNING;                break;                case START_FAILED:                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);            default:                break;        }    }}
复制代码


  PullMessageService 继承 ServiceThread,其本质是一个线程,在执行 this.pullMessageService.start()时会执行其 run 方法,run 方法的实现逻辑是:从 pullRequestQueue 中获取一个 PullRequest,如果 pullRequestQueue 为空,则线程将会阻塞,直到有任务被放入,然后调用 pullMessage 方法进行消息拉取。


@Overridepublic void run() {    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {        try {            PullRequest pullRequest = this.pullRequestQueue.take();            this.pullMessage(pullRequest);        } catch (InterruptedException ignored) {        } catch (Exception e) {            log.error("Pull Message Service Run Method exception", e);        }    }
    log.info(this.getServiceName() + " service end");}
复制代码


  接着再来看 pullMessage(final PullRequest pullRequest)方法:在这个方法中会根据 consumerGroup 来获取消费者的内部实现 MQConsumerInner,然后将其强制转换为 DefaultMQPushConsumerImpl,最后会调用 DefaultMQPushConsumerImpl 的 pullMessage 方法。在这里面我们也不难发现 PullMessageService 只为 PUSH 模式服务。


private void pullMessage(final PullRequest pullRequest) {        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());        if (consumer != null) {            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;            impl.pullMessage(pullRequest);        } else {            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);        }    }
复制代码


  接着来看 DefaultMQPushConsumerImpl 的 pullMessage 方法:


public void pullMessage(final PullRequest pullRequest) {    //从pullRequest中获取其ProcessQueue,如果ProcessQueue没有被丢弃则将其lastPullTimestamp属性更新为当前时间    final ProcessQueue processQueue = pullRequest.getProcessQueue();    if (processQueue.isDropped()) {        log.info("the pull request[{}] is dropped.", pullRequest.toString());        return;    }
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    try {        /*        判断消费者的状态是否正常,如果消费状态异常则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中        结束本次消息拉取        */        this.makeSureStateOK();    } catch (MQClientException e) {        log.warn("pullMessage exception, consumer state not ok", e);        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);        return;    }    //如果消费者被挂起则将拉取任务pullRequest延迟1s再次放入到PullMessageService的拉取任务队列中    if (this.isPause()) {        log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);        return;    }    //消息拉取流控    long cachedMessageCount = processQueue.getMsgCount().get();    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);        if ((queueFlowControlTimes++ % 1000) == 0) {            log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);        }        return;    }
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);        if ((queueFlowControlTimes++ % 1000) == 0) {            log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);        }        return;    }
    if (!this.consumeOrderly) {        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);            if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {                log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);            }            return;        }    } else {        if (processQueue.isLocked()) {            if (!pullRequest.isLockedFirst()) {                final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());                boolean brokerBusy = offset < pullRequest.getNextOffset();                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);                if (brokerBusy) {                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",                            pullRequest, offset);                }
                pullRequest.setLockedFirst(true);                pullRequest.setNextOffset(offset);            }        } else {            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);            log.info("pull message later because not locked in broker, {}", pullRequest);            return;        }    }    /*    根据pullRequest中的topic信息,从topic的订阅信息中获取其对应的订阅信息,    如果订阅信息为空则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中并结束本次消息拉取    */    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());    if (null == subscriptionData) {        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);        log.warn("find the consumer's subscription failed, {}", pullRequest);        return;    }
    final long beginTimestamp = System.currentTimeMillis();    //构建回调pullCallback,当broker端返回response给consumer端时会执行这个回调    PullCallback pullCallback = new PullCallback() {        @Override        public void onSuccess(PullResult pullResult) {            if (pullResult != null) {                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                switch (pullResult.getPullStatus()) {                    case FOUND:                        long prevRequestOffset = pullRequest.getNextOffset();                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());                        long pullRT = System.currentTimeMillis() - beginTimestamp;                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
                        long firstMsgOffset = Long.MAX_VALUE;                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                        } else {                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispatchToConsume);
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                            } else {                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            }                        }
                        if (pullResult.getNextBeginOffset() < prevRequestOffset                                || firstMsgOffset < prevRequestOffset) {                            log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset);                        }
                        break;                    case NO_NEW_MSG:                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                        break;                    case NO_MATCHED_MSG:                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                        break;                    case OFFSET_ILLEGAL:                        log.warn("the pull request offset illegal, {} {}",                                pullRequest.toString(), pullResult.toString());                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        pullRequest.getProcessQueue().setDropped(true);                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                            @Override                            public void run() {                                try {                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                    log.warn("fix the pull request offset, {}", pullRequest);                                } catch (Throwable e) {                                    log.error("executeTaskLater Exception", e);                                }                            }                        }, 10000);                        break;                    default:                        break;                }            }        }
        @Override        public void onException(Throwable e) {            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                log.warn("execute the pull request exception", e);            }
            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);        }    };
    boolean commitOffsetEnable = false;    long commitOffsetValue = 0L;    if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {               //从内存中获取pullRequest中MessageQueue的消费进度        commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);        if (commitOffsetValue > 0) {            commitOffsetEnable = true;        }    }
    String subExpression = null;    boolean classFilter = false;    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());    if (sd != null) {        if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {            subExpression = sd.getSubString();        }
        classFilter = sd.isClassFilterMode();    }    //构建消息拉取的系统标记    int sysFlag = PullSysFlag.buildSysFlag(        commitOffsetEnable, // commitOffset        true, // suspend        subExpression != null, // subscription        classFilter // class filter    );    try {        //与broker端交互        this.pullAPIWrapper.pullKernelImpl(            pullRequest.getMessageQueue(),            subExpression,            subscriptionData.getExpressionType(),            subscriptionData.getSubVersion(),            pullRequest.getNextOffset(),            this.defaultMQPushConsumer.getPullBatchSize(),            sysFlag,            commitOffsetValue,            BROKER_SUSPEND_MAX_TIME_MILLIS,            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,            CommunicationMode.ASYNC,            pullCallback        );    } catch (Exception e) {        log.error("pullKernelImpl exception", e);        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);    }}
复制代码


  pullKernelImpl 方法具体如下:


public PullResult pullKernelImpl(        final MessageQueue mq,        final String subExpression,        final String expressionType,        final long subVersion,        final long offset,        final int maxNums,        final int sysFlag,        final long commitOffset,        final long brokerSuspendMaxTimeMillis,        final long timeoutMillis,        final CommunicationMode communicationMode,        final PullCallback pullCallback    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        /*        根据brokerName、brokerId从mQClientFactory中获取broker的地址        在RocketMQ中相同名称的broker会有多个(主broker和从broker),但是brokerId会不一样        在每次拉取消息后会给出下次拉取消息时的建议,即从主broker上拉取还是从从broker上拉取        */        FindBrokerResult findBrokerResult =            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                this.recalculatePullFromWhichNode(mq), false);        //如果findBrokerResult为空,则首先会更新客户端topic路由信息表        //然后再次执行findBrokerAddressInSubscribe方法获取broker的地址        if (null == findBrokerResult) {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());            findBrokerResult =                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                    this.recalculatePullFromWhichNode(mq), false);        }
        if (findBrokerResult != null) {            {                // check version                if (!ExpressionType.isTagType(expressionType)                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);                }            }            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);            }            //构建PullMessageRequestHeader            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();            requestHeader.setConsumerGroup(this.consumerGroup);            requestHeader.setTopic(mq.getTopic());            requestHeader.setQueueId(mq.getQueueId());            requestHeader.setQueueOffset(offset);            requestHeader.setMaxMsgNums(maxNums);            requestHeader.setSysFlag(sysFlagInner);            requestHeader.setCommitOffset(commitOffset);            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);            requestHeader.setSubscription(subExpression);            requestHeader.setSubVersion(subVersion);            requestHeader.setExpressionType(expressionType);            /*            如果消息过滤的模式是类过滤,则根据topic、broker地址找到注册在broker上的FilterServer地址,从FilterServer上拉取信息,            否则从broker上拉取信息            */            String brokerAddr = findBrokerResult.getBrokerAddr();            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);            }
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(                brokerAddr,                requestHeader,                timeoutMillis,                communicationMode,                pullCallback);
            return pullResult;        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }
复制代码


  进入 public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) 方法中会发现客户端向 broker 发送的请求类型是“RequestCode.PULL_MESSAGE”,通过在代码中查找可以发现 broker 端处理该类型请求的是 PullMessageProcessor 的 processRequest 方法。


/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
复制代码


  下面来看 broker 端是如何处理客户端发送的拉取消息的请求。


  1.构建返回给 consumer 端的 response 并解析发送到 broker 端的 request


  2.检查 broker 的权限是否可读,如果不可读则将 response 的 code 设置为 ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST 并返回给 consumer 端


  3.在 broker 端获取 consumer 消费组的信息,如果配置信息中 consumeEnable 属性为 false,则将 response 的 code 设置为 ResponseCode.NO_PERMISSION 并返回给 consumer 端


  4.从请求中获取消息拉取时设置的系统标记


  5.在 broker 端获取消息 topic 的配置信息,如果配置信息为空则将 response 的 code 设置为 ResponseCode.TOPIC_NOT_EXIST 并返回给 consumer 端


  6.检查 topic 的权限是否可读,如果不可读则将 response 的 code 设置为 ResponseCode.NO_PERMISSION 并返回给 consumer 端


  7.检查待拉取信息的 MessageQueue 的 queueid 是否合法,如果不合法则将 response 的 code 设置为 ResponseCode.SYSTEM_ERROR 并返回给 consumer 端


  8.根据 topic、消息过滤表达式构建订阅消息实体,如果不是 TAG 模式则构建过滤数据 consumerFilterData


  9.构建消息过滤对象 messageFilter


  10.根据 requestHeader 中消费者的消费组名称、topic 名称、MessageQueue 的 queueId、待拉取信息的 ConsumeQueue 的逻辑偏移量、最大拉取消息条数和消息过滤器来查找消息。getMessage 方法中会计算出下次拉取任务的开始偏移量 nextBeginOffset


  11.如果获取到的 getMessageResult 不为空,则在 response 中设置 nextBeginOffset、minOffset、maxOffset


  12.如果从节点中包含下次拉取的偏移量则设置为下一次拉取任务的 brokerId


  13.根据 getMessageResult 的 status 来设置 response 中的 code,其对应关系如下:


getMessageResult statusResponseCodeFOUNDSUCCESSMESSAGE_WAS_REMOVINGPULL_RETRY_IMMEDIATELYNO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY 、OFFSET_TOO_SMALLPULL_OFFSET_MOVEDNO_MATCHED_MESSAGEPULL_RETRY_IMMEDIATELYOFFSET_FOUND_NULL 、OFFSET_OVERFLOW_ONEPULL_NOT_FOUND


  14.如果当前节点是主节点并且 commitlog 标记可用,则会触发更新消息消费进度


  15.将 response 返回给 consumer 端


  broker 将 response 返回给 consumer 端时会回调 PullCallBack 的 onSuccess 或者 onException,PullCallBack 就是 pullMessage(final PullRequest pullRequest) 方法中创建的。回调 PullCallBack 的方法如下:


this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {        @Override        public void operationComplete(ResponseFuture responseFuture) {            RemotingCommand response = responseFuture.getResponseCommand();            if (response != null) {                try {                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);                    assert pullResult != null;                    pullCallback.onSuccess(pullResult);                } catch (Exception e) {                    pullCallback.onException(e);                }            } else {                if (!responseFuture.isSendRequestOK()) {                    pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));                } else if (responseFuture.isTimeout()) {                    pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause()));                } else {                    pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));                }            }        }    });
复制代码


  接下来看看 consumer 端收到 broker 返回的 response 会如何处理?


  1.根据 broker 端返回的 response 将其处理成 PullResult,这一过程调用的是 processPullResponse 方法,该方法会进行状态码转换、构建 PullResult 对象。


response codepull statusSUCCESSFOUNDPULL_NOT_FOUNDNO_NEW_MSGPULL_RETRY_IMMEDIATELYNO_MATCHED_MSGPULL_OFFSET_MOVEDOFFSET_ILLEGAL


  2.根据 pullResult 更新下一次拉取的偏移量,如果 pullResult 中的 msgFoundList 为空则立刻把 PullRequest 放入 PullMessageService 的 pullRequestQueue 队列中


  3.将拉取到的消息放入 processQueue 中,然后再将消息提交到 ConsumeMessageQueue(ConsumeMessageQueue 分为两种,分别是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService)中用于 consumer 消费


  4.如果 pullInterval 大于 0,则将 pullRequest 延迟 pullInterval 毫秒后放入 PullMessageService 的 pullRequestQueue 队列中,这样形成持续拉取消息流程


  最后,总结下消息拉取流程,该流程总体上分为三步:


  1.consumer 端封装消息拉取请求 PullRequest 并将其发送给 broker


  2.broker 根据请求查找并返回消息给 consumer 端


  3.consumer 端将返回的消息消费


用户头像

还未添加个人签名 2020.06.14 加入

领取文中资料加微信:gyhycx7980 备注:InfoQ 即可

评论

发布
暂无评论
RocketMQ源码分析之消息拉取流程