写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」让我们一起探索一下 DefaultMQPushConsumer 的实现原理及源码分析

作者:浩宇天尚
  • 2021 年 11 月 22 日
  • 本文字数:17540 字

    阅读完需:约 58 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析

RocketMQ 的前提回顾

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  1. 能够保证严格的消息顺序

  2. 提供丰富的消息拉取模式

  3. 高效的订阅者水平扩展能力

  4. 实时的消息订阅机制

  5. 亿级消息堆积能力

为什么使用 RocketMQ

  1. 强调集群无单点,可扩展,任意一点高可用、水平可扩展

  2. 海量消息堆积能力,消息堆积后写入低延迟

  3. 支持上万个队列

  4. 消息失败重试机制

  5. 消息可查询

  6. 开源社区活跃

  7. 成熟度已经经过淘宝双十一的考验

RocketMQ 的发展变化

RocketMQ 开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用 oceanBase 作为持久化工具。在 RocketMQ1.x 和 2.x 使用 zookeeper 管理集群,3.x 开始使用 nameserver 代替 zk,更轻量级,此外 RocketMQ 的客户端拥有两种的操作方式:DefaultMQPushConsumer 和 DefaultMQPullConsumer。

DefaultMQPushConsumer 的 Maven 配置

<dependency>   <groupId>org.apache.rocketmq</groupId>   <artifactId>rocketmq-client</artifactId>   <version>4.3.0</version></dependency>
复制代码

DefaultMQPushConsumer 使用示例

  1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费

  2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费

  3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费


以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在 broker 端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始


public class MQPushConsumer {    public static void main(String[] args) throws MQClientException {        String groupName = "rocketMqGroup1";        // 用于把多个Consumer组织到一起,提高并发处理能力        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);        // 设置nameServer地址,多个以;分隔        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.setMessageModel(MessageModel.BROADCASTING);        // 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息        consumer.subscribe("order-topic", "*");        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,                    ConsumeConcurrentlyContext consumeconcurrentlycontext) {                System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        consumer.start();    }}
复制代码


  • CLUSTERING:默认模式,同一个 ConsumerGroup(groupName 相同)每个 consumer 只消费所订阅消息的一部分内容,同一个 ConsumerGroup 里所有的 Consumer 消息加起来才是所

  • 订阅 topic 整体,从而达到负载均衡的目的

  • BROADCASTING:同一个 ConsumerGroup 每个 consumer 都消费到所订阅 topic 所有消息,也就是一个消费会被多次分发,被多个 consumer 消费。


ConsumeConcurrentlyStatus.RECONSUME_LATER boker 会根据设置的 messageDelayLevel 发起重试,默认 16 次。


DefaultMQPushConsumerImpl 中各个对象的主要功能如下:


RebalancePushImpl:主要负责决定,当前的 consumer 应该从哪些 Queue 中消费消息;


  • 1)PullAPIWrapper:长连接,负责从 broker 处拉取消息,然后利用 ConsumeMessageService 回调用户的 Listener 执行消息消费逻辑;

  • 2)ConsumeMessageService:实现所谓的"Push-被动"消费机制;从 Broker 拉取的消息后,封装成 ConsumeRequest 提交给 ConsumeMessageSerivce,此 service 负责回调用户的 Listener 消费消息;

  • 3)OffsetStore:维护当前 consumer 的消费记录(offset);有两种实现,Local 和 Rmote,Local 存储在本地磁盘上,适用于 BROADCASTING 广播消费模式;而 Remote 则将消费进度存储在 Broker 上,适用于 CLUSTERING 集群消费模式;

  • 4)MQClientFactory:负责管理 client(consumer、producer),并提供多中功能接口供各个 Service(Rebalance、PullMessage 等)调用;大部分逻辑均在这个类中完成;


consumer.registerMessageListener 执行过程:


/**     * Register a callback to execute on message arrival for concurrent consuming.     * @param messageListener message handling callback.     */    @Override    public void registerMessageListener(MessageListenerConcurrently messageListener) {        this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);    }
复制代码


通过源码可以看出主要实现过程在 DefaultMQPushConsumerImpl 类中 consumer.start 后调用 DefaultMQPushConsumerImpl 的同步 start 方法


public synchronized void start() throws MQClientException {        switch (this.serviceState) {            case CREATE_JUST:                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());                this.serviceState = ServiceState.START_FAILED;                this.checkConfig();                this.copySubscription();                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {                    this.defaultMQPushConsumer.changeInstanceNameToPID();                }                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);                this.pullAPIWrapper = new PullAPIWrapper(                    mQClientFactory,                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                if (this.defaultMQPushConsumer.getOffsetStore() != null) {                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();                } else {                    switch (this.defaultMQPushConsumer.getMessageModel()) {                        case BROADCASTING:                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                            break;                        case CLUSTERING:                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                            break;                        default:                            break;                    }                  this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);                }                this.offsetStore.load();                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {                    this.consumeOrderly = true;                    this.consumeMessageService =                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {                    this.consumeOrderly = false;                    this.consumeMessageService =                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());                }                this.consumeMessageService.start();                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);                if (!registerOK) {                    this.serviceState = ServiceState.CREATE_JUST;                    this.consumeMessageService.shutdown();                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                        null);                }                mQClientFactory.start();                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());                this.serviceState = ServiceState.RUNNING;                break;            case RUNNING:            case START_FAILED:            case SHUTDOWN_ALREADY:                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "                    + this.serviceState                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                    null);            default:                break;        }        this.updateTopicSubscribeInfoWhenSubscriptionChanged();        this.mQClientFactory.checkClientInBroker();        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();        this.mQClientFactory.rebalanceImmediately();    }
复制代码


通过 mQClientFactory.start();发我们发现他调用


public void start() throws MQClientException {        synchronized (this) {            switch (this.serviceState) {                case CREATE_JUST:                    this.serviceState = ServiceState.START_FAILED;                    // If not specified,looking address from name server                    if (null == this.clientConfig.getNamesrvAddr()) {                        this.mQClientAPIImpl.fetchNameServerAddr();                    }                    // Start request-response channel                    this.mQClientAPIImpl.start();                    // Start various schedule tasks                    this.startScheduledTask();                    // Start pull service                    this.pullMessageService.start();                    // Start rebalance service                    this.rebalanceService.start();                    // Start push service                  this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                    log.info("the client factory [{}] start OK", this.clientId);                    this.serviceState = ServiceState.RUNNING;                    break;                case RUNNING:                    break;                case SHUTDOWN_ALREADY:                    break;                case START_FAILED:                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);                default:                    break;            }        }    }
复制代码


在这个方法中有多个 start,我们主要看 pullMessageService.start();通过这里我们发现 RocketMQ 的 Push 模式底层其实也是通过 pull 实现的,下面我们来看下 pullMessageService 处理了哪些逻辑:


private void pullMessage(final PullRequest pullRequest) {        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());        if (consumer != null) {            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;            impl.pullMessage(pullRequest);        } else {            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);        }    }
复制代码


我们发现其实他还是通过 DefaultMQPushConsumerImpl 类的 pullMessage 方法来进行消息的逻辑处理.

pullRequest 拉取方式

PullRequest 这里说明一下,上面我们已经提了一下 rocketmq 的 push 模式其实是通过 pull 模式封装实现的,pullrequest 这里是通过长轮询的方式达到 push 效果。


长轮询方式既有 pull 的优点又有 push 模式的实时性有点。


  • push 方式是 server 端接收到消息后,主动把消息推送给 client 端,实时性高。弊端是 server 端工作量大,影响性能,其次是 client 端处理能力不同且 client 端的状态不受 server 端的控制,如果 client 端不能及时处理消息容易导致消息堆积已经影响正常业务等。

  • pull 方式是 client 循环从 server 端拉取消息,主动权在 client 端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费 CPU 资源,时间间隔太长 client 的处理能力会下降,有时候有些消息会处理不及时。

长轮询的方式可以结合两者优点
  1. 检查 PullRequest 对象中的 ProcessQueue 对象的 dropped 是否为 true(在 RebalanceService 线程中为 topic 下的 MessageQueue 创建拉取消息请求时要维护对应的 ProcessQueue 对象,若 Consumer 不再订阅该 topic 则会将该对象的 dropped 置为 true);若是则认为该请求是已经取消的,则直接跳出该方法;

  2. 更新 PullRequest 对象中的 ProcessQueue 对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳;

  3. 检查该 Consumer 是否运行中,即 DefaultMQPushConsumerImpl.serviceState 是否为 RUNNING;若不是运行状态或者是暂停状态(DefaultMQPushConsumerImpl.pause=true),则调用 PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中 timeDelay=3000;该方法的目的是在 3 秒之后再次将该 PullRequest 对象放入 PullMessageService. pullRequestQueue 队列中;并跳出该方法;

  4. 进行流控。若 ProcessQueue 对象的 msgCount 大于了消费端的流控阈值(DefaultMQPushConsumer.pullThresholdForQueue,默认值为 1000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后重新该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中;并跳出该方法;

  5. 若不是顺序消费(即 DefaultMQPushConsumerImpl.consumeOrderly 等于 false),则检查 ProcessQueue 对象的 msgTreeMap:TreeMap<Long,MessageExt>变量的第一个 key 值与最后一个 key 值之间的差额,该 key 值表示查询的队列偏移量 queueoffset;若差额大于阈值(由 DefaultMQPushConsumer. consumeConcurrentlyMaxSpan 指定,默认是 2000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后重新将该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中;并跳出该方法;

  6. 以 PullRequest.messageQueue 对象的 topic 值为参数从 RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中获取对应的 SubscriptionData 对象,若该对象为 null,考虑到并发的关系,调用 executePullRequestLater 方法,稍后重试;并跳出该方法;

  7. 若消息模型为集群模式(RebalanceImpl.messageModel 等于 CLUSTERING),则以 PullRequest 对象的 MessageQueue 变量值、type =READ_FROM_MEMORY(从内存中获取消费进度 offset 值)为参数调用 DefaultMQPushConsumerImpl. offsetStore 对象(初始化为 RemoteBrokerOffsetStore 对象)的 readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度 offset 值。若该 offset 值大于 0 则置临时变量 commitOffsetEnable 等于 true 否则为 false;该 offset 值作为 pullKernelImpl 方法中的 commitOffset 参数,在 Broker 端拉取消息之后根据 commitOffsetEnable 参数值决定是否用该 offset 更新消息进度。该 readOffset 方法的逻辑是:以入参 MessageQueue 对象从 RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为 null 则返回该值,否则返回-1;

  8. 当每次拉取消息之后需要更新订阅关系(由 DefaultMQPushConsumer. postSubscriptionWhenPull 参数表示,默认为 false)并且以 topic 值参数从 RebalanceImpl.subscriptionInner 获取的 SubscriptionData 对象的 classFilterMode 等于 false(默认为 false),则将 sysFlag 标记的第 3 个字节置为 1,否则该字节置为 0;

  9. 该 sysFlag 标记的第 1 个字节置为 commitOffsetEnable 的值;第 2 个字节(suspend 标记)置为 1;第 4 个字节置为 classFilterMode 的值;

  10. 初始化匿名内部类 PullCallback,实现了 onSucess/onException 方法; 该方法只有在异步请求的情况下才会回调;

  11. 调用底层的拉取消息 API 接口:

PullAPIWrapper.pullKernelImpl

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法进行消息拉取操作。


将回调类 PullCallback 传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。


public void pullMessage(final PullRequest pullRequest) {        final ProcessQueue processQueue = pullRequest.getProcessQueue();        if (processQueue.isDropped()) {            log.info("the pull request[{}] is dropped.", pullRequest.toString());            return;        }        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());        try {            this.makeSureStateOK();        } catch (MQClientException e) {            log.warn("pullMessage exception, consumer state not ok", e);            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            return;        }        if (this.isPause()) {            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);            return;        }        long cachedMessageCount = processQueue.getMsgCount().get();        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);            if ((queueFlowControlTimes++ % 1000) == 0) {                log.warn(                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);            }            return;        }        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);            if ((queueFlowControlTimes++ % 1000) == 0) {                log.warn(                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);            }            return;        }        if (!this.consumeOrderly) {            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {                    log.warn(                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),                        pullRequest, queueMaxSpanFlowControlTimes);                }                return;            }        } else {            if (processQueue.isLocked()) {                if (!pullRequest.isLockedFirst()) {                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());                    boolean brokerBusy = offset < pullRequest.getNextOffset();                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",                        pullRequest, offset, brokerBusy);                    if (brokerBusy) {                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",                            pullRequest, offset);                    }                    pullRequest.setLockedFirst(true);                    pullRequest.setNextOffset(offset);                }            } else {                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);                log.info("pull message later because not locked in broker, {}", pullRequest);                return;            }        }        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());        if (null == subscriptionData) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            log.warn("find the consumer's subscription failed, {}", pullRequest);            return;        }        final long beginTimestamp = System.currentTimeMillis();        PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                    switch (pullResult.getPullStatus()) {                        case FOUND:                            long prevRequestOffset = pullRequest.getNextOffset();                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            long pullRT = System.currentTimeMillis() - beginTimestamp;                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),                                pullRequest.getMessageQueue().getTopic(), pullRT);                            long firstMsgOffset = Long.MAX_VALUE;                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            } else {                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispatchToConsume);                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                                } else {                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                                }                            }                            if (pullResult.getNextBeginOffset() < prevRequestOffset                                || firstMsgOffset < prevRequestOffset) {                                log.warn(                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",                                    pullResult.getNextBeginOffset(),                                    firstMsgOffset,                                    prevRequestOffset);                            }                            break;                        case NO_NEW_MSG:                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            break;                        case NO_MATCHED_MSG:                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            break;                        case OFFSET_ILLEGAL:                            log.warn("the pull request offset illegal, {} {}",                                pullRequest.toString(), pullResult.toString());                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            pullRequest.getProcessQueue().setDropped(true);                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {                                @Override                                public void run() {                                    try {                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),                                            pullRequest.getNextOffset(), false);                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());                                        log.warn("fix the pull request offset, {}", pullRequest);                                    } catch (Throwable e) {                                        log.error("executeTaskLater Exception", e);                                    }                                }                            }, 10000);                            break;                        default:                            break;                    }                }            }            @Override            public void onException(Throwable e) {                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    log.warn("execute the pull request exception", e);                }                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            }        };        boolean commitOffsetEnable = false;        long commitOffsetValue = 0L;        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);            if (commitOffsetValue > 0) {                commitOffsetEnable = true;            }        }        String subExpression = null;        boolean classFilter = false;        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());        if (sd != null) {            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {                subExpression = sd.getSubString();            }            classFilter = sd.isClassFilterMode();        }        int sysFlag = PullSysFlag.buildSysFlag(            commitOffsetEnable, // commitOffset            true, // suspend            subExpression != null, // subscription            classFilter // class filter        );        try {            // 下面我们看继续跟进这个方法,这个方法已经就是客户端如何拉取消息            this.pullAPIWrapper.pullKernelImpl(                pullRequest.getMessageQueue(),                subExpression,                subscriptionData.getExpressionType(),                subscriptionData.getSubVersion(),                pullRequest.getNextOffset(),                this.defaultMQPushConsumer.getPullBatchSize(),                sysFlag,                commitOffsetValue,                BROKER_SUSPEND_MAX_TIME_MILLIS,                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                // 消息的通信方式为异步                CommunicationMode.ASYNC,                pullCallback            );        } catch (Exception e) {            log.error("pullKernelImpl exception", e);            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);        }    }
复制代码

发送远程请求拉取消息

在 MQClientAPIImpl.pullMessage 方法中,根据入参 communicationMode 的值分为异步拉取和同步拉取方式两种。


无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个 ResponseFuture 对象,以请求消息的序列号为 key 值,存入 NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:


  1. 发送失败后直接删掉 responseTable 变量中的相应记录;

  2. 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从 responseTable 中查找 ResponseFuture 对象,并设置该对象的 responseCommand 变量。若是同步发送会唤醒等待响应的 ResponseFuture.waitResponse 方法;若是异步发送会调用 ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;

  3. 在 NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔 1 秒定期扫描 responseTable 列表,遍历该列表中的 ResponseFuture 对象,检查等待响应是否超时,若超时,则调用 ResponseFuture. executeInvokeCallback()方法,并将该对象从 responseTable 列表中删除;


public PullResult pullMessage(        final String addr,        final PullMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final PullCallback pullCallback    ) throws RemotingException, MQBrokerException, InterruptedException {        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);        switch (communicationMode) {            case ONEWAY:                assert false;                return null;            case ASYNC:                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);                return null;            case SYNC:                return this.pullMessageSync(addr, request, timeoutMillis);            default:                assert false;                break;        }        return null;    }
复制代码

同步拉取

对于同步发送方式,调用 MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步骤如下:


  1. 调用 RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:

  2. 获取 Broker 地址的 Channel 信息。根据 broker 地址从 RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取 ChannelWrapper 对象并返回该对象的 Channel 变量;若没有 ChannelWrapper 对象则与 broker 地址建立新的连接并将连接信息存入 channelTables 变量中,便于下次使用;

  3. 若 NettyRemotingClient.rpcHook:RPCHook 变量不为空(该变量在应用层初始化 DefaultMQPushConsumer 或者 DefaultMQPullConsumer 对象传入该值),则调用 RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;

  4. 调用 NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:

  5. A)使用请求的序列号(opaue)、超时时间初始化 ResponseFuture 对象;并将该 ResponseFuture 对象存入 NettyRemotingAbstract.responseTable: ConcurrentHashMap 变量中;

  6. B)调用 Channel.writeAndFlush(Object msg)方法将请求对象 RemotingCommand 发送给 Broker;然后调用 addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了 ChannelFutureListener 接口的 operationComplete 方法,在发送完成之后回调该监听类的 operationComplete 方法,在该方法中,首先调用 ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置 ResponseFuture 对象的 sendRequestOK 等于 true 并退出此回调方法等待响应结果;若不成功则置 ResponseFuture 对象的 sendRequestOK 等于 false,然后从 NettyRemotingAbstract.responseTable 中删除此请求序列号(opaue)的记录,置 ResponseFuture 对象的 responseCommand 等于 null,并唤醒 ResponseFuture.waitResponse(long timeoutMillis)方法的等待;

  7. C)调用 ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息(详见 5.10.3 小节)或者超时的情况下会唤醒该方法返回 ResponseFuture.responseCommand 变量值;

  8. D)若上一步返回的 responseCommand 值为 null,则抛出异常:若 ResponseFuture.sendRequestOK 为 true,则抛出 RemotingTimeoutException 异常,否则抛出 RemotingSendRequestException 异常;

  9. E)若上一步返回的 responseCommand 值不为 null,则返回 responseCommand 变量值;

  10. 若 NettyRemotingClient.rpcHook: RPCHook 变量不为空,则调用 RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;


  • 以上一步的返回值 RemotingCommand 对象为参数调用 MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成 PullResultExt 对象然后返回给调用者,响应消息的结果状态转换如下:

  • 若 RemotingCommand 对象的 Code 等于 SUCCESS,则 PullResultExt.pullStatus=FOUND;

  • 若 RemotingCommand 对象的 Code 等于 PULL_NOT_FOUND,则 PullResultExt.pullStatus= NO_NEW_MSG;

  • 若 RemotingCommand 对象的 Code 等于 PULL_RETRY_IMMEDIATELY,则 PullResultExt.pullStatus= NO_MATCHED_MSG;

  • 若 RemotingCommand 对象的 Code 等于 PULL_OFFSET_MOVED,则 PullResultExt.pullStatus= OFFSET_ILLEGAL;


@Override    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {        long beginStartTime = System.currentTimeMillis();        final Channel channel = this.getAndCreateChannel(addr);        if (channel != null && channel.isActive()) {            try {                if (this.rpcHook != null) {                    this.rpcHook.doBeforeRequest(addr, request);                }                long costTime = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTime) {                    throw new RemotingTimeoutException("invokeSync call timeout");                }                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);                if (this.rpcHook != null) {                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);                }                return response;            } catch (RemotingSendRequestException e) {                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);                this.closeChannel(addr, channel);                throw e;            } catch (RemotingTimeoutException e) {                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {                    this.closeChannel(addr, channel);                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);                }                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);                throw e;            }        } else {            this.closeChannel(addr, channel);            throw new RemotingConnectException(addr);        }    }
复制代码


getMQClientAPIImpl().pullMessage 最终通过 channel 写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次 waitForRunning 一段时间默认 5 秒,然后再 check,如果 broker 一直没有新新消息,第三次 check 的时间等到时间超过 SuspendMaxTimeMills 就返回空,如果在等待过程中收到了新消息直接调用 notifyMessageArriving 函数返回请求结果。“长轮询”的核心是,Broker 端 HOLD 住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在 consumer 中,即使 broker 有大量的消息堆积也不会主动推送给 consumer。

发布于: 15 小时前阅读数: 7
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析