写点什么

RocketMQ 之消费者启动与消费流程

  • 2022 年 7 月 12 日
  • 本文字数:13583 字

    阅读完需:约 45 分钟

vivo 互联网服务器团队 - Li Kui

一、简介

1.1 RocketMQ 简介

RocketMQ 是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push 消息等功能。RocketMQ 主要由 Producer、Broker、Consumer 、NameServer 四部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer 充当名字路由服务,整体架构图如下所示:

  • Producer:负责生产消息,一般由业务系统生产消息,可通过集群方式部署。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

  • Consumer:负责消费消息,一般是后台系统负责异步消费,可通过集群方式部署。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。提供 pull/push 两者消费模式。

  • Broker Server:负责存储消息、转发消息。RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备,存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

  • Name Server:名字服务,充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 NameServer 实例组成集群,相互独立,没有信息交换。

本文基于 Apache RocketMQ 最新版本主要讲述 RocketMQ 的消费者机制,分析其启动流程、pull/push 机制,消息 ack 机制以及定时消息和顺序消息的不同。

1.2 工作流程

(1)启动 NameServer。

NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

(2)启动 Broker。

跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

(3)创建 Topic。

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

(4)Producer 发送消息。

启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

(5)Consumer 消费消息。

跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

二、消费者启动流程

官方给出的消费者实现代码如下所示:

public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        // 实例化消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");        // 设置NameServer的地址        consumer.setNamesrvAddr("localhost:9876");        // 订阅一个Topic,以及Tag来过滤需要消费的消息        consumer.subscribe("Test", "*");        // 注册回调实现类来处理从broker拉取回来的消息        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);                // 标记该消息已经被成功消费                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 启动消费者实例        consumer.start();        System.out.printf("Consumer Started.%n");    }}
复制代码

下面让我们来分析消费者在启动中每一阶段中做了什么吧,let’s go.

2.1 实例化消费者

第一步主要是实例化消费者,这里采取默认的 Push 消费者模式,构造器中参数为对应的消费者分组,指定同一分组可以消费同一类型的消息,如果没有指定,将会采取默认的分组模式,这里实例化了一个 DefaultMQPushConsumerImpl 对象,它是后面消费功能的主要实现类。

// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
复制代码

主要通过 DefaultMQPushConsumer 实例化 DefaultMQPushConsumerImpl,它是主要的消费功能实现类。

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,       AllocateMessageQueueStrategy allocateMessageQueueStrategy) {       this.consumerGroup = consumerGroup;       this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;       defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);   }
复制代码

2.2 设置 NameServer 和订阅 topic 过程

// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("Test", "*");
复制代码

2.2.1 添加 tag

设置 NameServer 地址后,这个地址为你名字服务集群的地址,类似于 zookeeper 集群地址,样例给出的是单机本地地址,搭建集群后,可以设置为集群地址,接下来我们需要订阅一个主题 topic 下的消息,设置对应的 topic,可以进行分类,通过设置不同的 tag 来实现,但目前只支持"||"进行连接,如:"tag1 || tag2 || tag3"。归根在于构造订阅数据时,源码通过"||"进行了字符串的分割,如下所示:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,    String subString) throws Exception {    SubscriptionData subscriptionData = new SubscriptionData();    subscriptionData.setTopic(topic);    subscriptionData.setSubString(subString);     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {        subscriptionData.setSubString(SubscriptionData.SUB_ALL);    } else {        String[] tags = subString.split("\\|\\|");        if (tags.length > 0) {            for (String tag : tags) {                if (tag.length() > 0) {                    String trimString = tag.trim();                    if (trimString.length() > 0) {                        subscriptionData.getTagsSet().add(trimString);                        subscriptionData.getCodeSet().add(trimString.hashCode());                    }                }            }        } else {            throw new Exception("subString split error");        }    }     return subscriptionData;}
复制代码

2.2.2 发送心跳至 Broker

前面构造好订阅主题和分类后,将其放入了一个 ConcurrentMap 中,并调用 sendHeartbeatToAllBrokerWithLock()方法,进行心跳检测和上传过滤器类至 broker 集群(生产者启动过程也会进行此步骤)。如下所示:

public void sendHeartbeatToAllBrokerWithLock() {    if (this.lockHeartbeat.tryLock()) {        try {            this.sendHeartbeatToAllBroker();            this.uploadFilterClassSource();        } catch (final Exception e) {            log.error("sendHeartbeatToAllBroker exception", e);        } finally {            this.lockHeartbeat.unlock();        }    } else {        log.warn("lock heartBeat, but failed.");    }}
复制代码

首先会对 broker 集群进行心跳检测,在此过程中会施加锁,它会执行 sendHeartbeatToAllBroker 方法,构建心跳数据 heartbeatData,然后遍历消费和生产者 table,将消费者和生产者信息加入到 heartbeatData 中,当都存在消费者和生产者的情况下,会遍历 brokerAddrTable,往每个 broker 地址发送心跳,相当于往对应地址发送一次 http 请求,用于探测当前 broker 是否存活。

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
复制代码

2.2.3 上传过滤器类至 FilterServer

之后会执行 uploadFilterClassSource()方法,只有 push 模式才会有此过程,在此模式下,它会循环遍历订阅数据 SubscriptionData,如果此订阅数据使用了类模式过滤,会调 uploadFilterClassToAllFilterServer()方法:上传用户自定义的过滤消息实现类至过滤器服务器。

private void uploadFilterClassSource() {    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();    while (it.hasNext()) {        Entry<String, MQConsumerInner> next = it.next();        MQConsumerInner consumer = next.getValue();        if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {            Set<SubscriptionData> subscriptions = consumer.subscriptions();            for (SubscriptionData sub : subscriptions) {                if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {                    final String consumerGroup = consumer.groupName();                    final String className = sub.getSubString();                    final String topic = sub.getTopic();                    final String filterClassSource = sub.getFilterClassSource();                    try {                        this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);                    } catch (Exception e) {                        log.error("uploadFilterClassToAllFilterServer Exception", e);                    }                }            }        }    }}
复制代码

过滤器类的作用:消费端可以上传一个 Class 类文件到 FilterServer,Consumer 从 FilterServer 拉取消息时,FilterServer 会把请求转发给 Broker,FilterServer 收取到 Broker 消息后,根据上传的过滤类中的逻辑做过滤操作,过滤完成后再把消息给到 Consumer,用户可以自定义过滤消息的实现类。

2.3 注册回调实现类

接下来就是代码中的注册回调实现类了,当然,如果你是 pull 模式的话就不需要实现它了,push 模式需要定义,两者区别后面会讲到,它主要用于从 broker 实时获取消息,这里有两种消费上下文类型,用于不同的消费类型。

ConsumeConcurrentlyContext:延时类消息上下文,用于延时消息,即定时消息,默认不延迟,可以设置延迟等级,每个等级对应固定时间刻度,RocketMQ 中不能自定义延迟时间,延迟等级从 1 开始,对应的时间间隔如下所示:

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ConsumeOrderlyContext :顺序类消息上下文,控制发送消息的顺序,生产者设置分片路由规则后,相同 key 只落到指定 queue 上,消费过程中会对顺序消息所在的 queue 加锁,保证消息的有序性。

2.4 消费者启动

我们先来看下消费者启动的过程,如下所示:


(1)this.checkConfig():首先是检测消费配置项,包括消费分组 group、消息模型(集群、广播)、订阅数据、消息监听器等是否存在,如果不存在的话,会抛出异常。

(2)copySubscription():构建主题订阅信息 SubscriptionData 并加入到 RebalanceImpl 负载均衡方法的订阅信息中。

(3)getAndCreateMQClientInstance():初始化 MQ 客户端实例。

(4)offsetStore.load():根据不同消息模式创建消费进度 offsetStore 并加载:BROADCASTING-广播模式,同一个消费 group 中的 consumer 都消费一次,CLUSTERING-集群模式,默认方式,只被消费一次。

switch (this.defaultMQPushConsumer.getMessageModel()) {    case BROADCASTING:        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    case CLUSTERING:        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    default:        break;}
复制代码

可以通过 setMessageModel 方式设置不同模式;广播模式下同消费组的消费者相互独立,消费进度在本地单独进行存储;集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,消费进度是共享在整个消费组中的。

(5)consumeMessageService.start():根据不同消息监听类型实例化并启动。这里有延时消息和顺序消息。

这里主要讲下顺序消息,RocketMQ 也帮我们实现了,在启动时,如果是集群模式并是顺序类型,它会启动定时任务,定时向 broker 发送批量锁,锁住当前顺序消费发往的消息队列,顺序消息因为生产者生产消息时指定了分片策略和消息上下文,只会发往一个消费队列。

定时任务发送批量锁,锁住当前顺序消息队列。

public void start() {        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {                @Override                public void run() {                    ConsumeMessageOrderlyService.this.lockMQPeriodically();                }            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);        }    }
复制代码

发送锁住队列的消息至 broker,broker 端返回锁住成功的队列集合 lockOKMQSet,顺序消息具体实现可查看后面第四节。

(6)mQClientFactory.registerConsumer():MQClientInstance 注册消费者,并启动 MQClientInstance,没有注册成功会结束消费服务。

(7)mQClientFactory.start():最后会启动如下服务:远程客户端、定时任务、pull 消息服务、负载均衡服务、push 消息服务,然后将状态改为运行中。

switch (this.serviceState) {    case CREATE_JUST:        this.serviceState = ServiceState.START_FAILED;        // If not specified,looking address from name server        if (null == this.clientConfig.getNamesrvAddr()) {            this.mQClientAPIImpl.fetchNameServerAddr();        }        // Start request-response channel        this.mQClientAPIImpl.start();        // Start various schedule tasks        this.startScheduledTask();        // Start pull service        this.pullMessageService.start();        // Start rebalance service        this.rebalanceService.start();        // Start push service        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);        log.info("the client factory [{}] start OK", this.clientId);        this.serviceState = ServiceState.RUNNING;        break;    case RUNNING:        break;    case SHUTDOWN_ALREADY:        break;    case START_FAILED:        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);    default:        break;}
复制代码

全部启动完毕后,整个消费者也就启动好了,接下来就可以对生产者发送过来的消息进行消费了,那么是如何进行消息消费的呢?不同的消息模式有何区别呢?

三、pull/push 模式消费

3.1 pull 模式-DefaultMQPullConsumer


pull 拉取式消费:应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用程序控制,可以指定消费的位移,【伪代码】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 启动消费者实例consumer.start();//获取主题下所有的消息队列,这里根据主题从nameserver获取的Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");for (MessageQueue queue : mqs) {    //获取当前队列的消费位移,指定消费进度offset,fromstore:从broker中获取还是本地获取,true-broker    long offset = consumer.fetchConsumeOffset(queue, true);    PullResult pullResult = null;    while (offset < pullResult.getMaxOffset()) {        //第二个参数为tag,获取指定topic下的tag        //第三个参数表示从哪个位移下开始消费消息        //第四个参数表示一次最大拉取多少个消息        try {            pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);        } catch (Exception e) {            e.printStackTrace();            System.out.println("pull拉取消息失败");        }        //代码省略,记录消息位移        offset = pullResult.getNextBeginOffset();        //代码省略,这里为消费消息    }}
复制代码

可以看到我们是主动拉取 topic 对应下的消息队列,然后遍历它们,获取当前消费进度并进行消费。

3.2 push 模式-DefaultMQPushConsumer

该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高,现在一般推荐使用该方式,具体示例可以观看第一章开头的官方 demo。

它也是通过实现 pull 方式来实现的,首先,前面 2.4 消费者启动之后,最后会启动拉取消息服务 pullMessageService 和负载均衡 rebalanceService 服务,它们启动后会一直有线程进行消费。

case CREATE_JUST:               //......                // Start pull service                this.pullMessageService.start();                // Start rebalance service                this.rebalanceService.start();                //.......                this.serviceState = ServiceState.RUNNING;                break;  case RUNNING:
复制代码

这里面调用 doRebalance()方法,进行负载均衡,默认每 20s 做一次,会轮询所有订阅该实例的 topic。

public class RebalanceService extends ServiceThread {    //初始化,省略....     @Override    public void run() {        log.info(this.getServiceName() + " service started");         while (!this.isStopped()) {            this.waitForRunning(waitInterval);            //做负载均衡            this.mqClientFactory.doRebalance();        }         log.info(this.getServiceName() + " service end");    }     @Override    public String getServiceName() {        return RebalanceService.class.getSimpleName();    }}
复制代码

然后根据每个 topic,以及它是否顺序消息模式来做 rebalance。

具体做法就是先对 Topic 下的消息消费队列、消费者 Id 进行排序,然后用消息队列的平均分配算法,计算出待拉取的消息队列,将分配到的消息队列集合与 processQueueTable 做一个过滤比对,新队列不包含或已过期,则进行移除 。

public void doRebalance(final boolean isOrder) {      Map<String, SubscriptionData> subTable = this.getSubscriptionInner();      if (subTable != null) {          for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {              final String topic = entry.getKey();              try {                  /根据 /每个topic,以及它是否顺序消息模式来做rebalance                  this.rebalanceByTopic(topic, isOrder);              } catch (Throwable e) {                  if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      log.warn("rebalanceByTopic Exception", e);                  }              }          }      }       this.truncateMessageQueueNotMyTopic();  }
复制代码

rebalanceByTopic 中广播和集群模式都会执行 updateProcessQueueTableInRebalance()方法,最后会分发请求 dispatchPullRequest,通过 executePullRequestImmediately()方法将 pull 请求放入 pull 请求队列 pullRequestQueue 中,注意,pull 模式下分发请求方法 dispatchPullRequest()实际实现是一个空方法,这里两者很大不同,push 模式实现如下

@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) {     for (PullRequest pullRequest : pullRequestList) {         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);     } }
复制代码

然后再 PullMessageService 中,因为前面 consumer 启动成功了,PullMessageService 线程会实时去取 pullRequestQueue 中的 pull 请求。

@Override  public void run() {      log.info(this.getServiceName() + " service started");       while (!this.isStopped()) {          try {              PullRequest pullRequest = this.pullRequestQueue.take();              if (pullRequest != null) {                  this.pullMessage(pullRequest);              }          } catch (InterruptedException e) {          } catch (Exception e) {              log.error("Pull Message Service Run Method exception", e);          }      }       log.info(this.getServiceName() + " service end");  }
复制代码

取出来的 pull 请求又会经由 DefaultMQPushConsumerImpl 的消息监听类,调用 pullMessage()方法。

private void pullMessage(final PullRequest pullRequest) {     final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());     if (consumer != null) {         DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;         impl.pullMessage(pullRequest);     } else {         log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);     } }
复制代码

pullMessage()中 pullKernelImpl()有一个 Pullback 方法用于执行消息的回调,它会通过 submitConsumeRequest()这个方法来处理消息,总而言之就是通过线程回调的方式让 push 模式下的监听器能够感知到。

//Pull回调PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                     switch (pullResult.getPullStatus()) {                        case FOUND:                         //省略...消费位移更新                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispathToConsume);
复制代码

这个方法对应的不同消费模式有着不同实现,但都是会构建一个消费请求 ConsumeRequest,里面有一个 run()方法,构建完毕后,会把它放入到 listener 监听器中。

//监听消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
复制代码

还记得前面我们样例给出的注册监听器回调处理方法吗?

我们可以点击上面的 consumeMessage 方法,查看它在源码中的实现位置,发现它就回到了我们前面的 2.3 注册回调实现类里面了,整个流程是不是通顺了呢?这个监听器中就会收到 push 的消息,拉取出来进行业务消费逻辑,下面是我们自己定义的消息回调处理方法。

// 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() {     @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);         // 标记该消息已经被成功消费         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } });
复制代码

3.3 小结

push 模式相比较于 pull 模式不同的是,做负载均衡时,pullRequest 请求会放入 pullRequestQueue,然后 PullMessageService 线程会实时去取出这个请求,将消息存入 ProcessQueue,通过线程回调的方式让 push 模式下的监听器能够感知到,这样消息从分发请求到接收都是实时的,而 pull 模式是消费端主动去拉取指定消息的,需要指定消费进度。

对于我们开发者来说,选取哪种模式实现我们的业务逻辑比较合适呢?别急,先让我们总结下他们的特点:

共同点:

  1. 两者底层实际一样,push 模式也是基于 pull 模式来实现的。

  2. pull 模式需要我们通过程序主动通过 consumer 向 broker 拉消息,而消息的 push 模式则只需要我们提供一个 listener 监听,实时获取消息。

优点:

push 模式采用长轮询阻塞的方式获取消息,实时性非常高;

push 模式 rocketMQ 处理了获取消息的细节,使用起来比较简单方便;

pull 模式可以指定消费进度,想消费多少就消费多少,灵活性大。

缺点:

  1. push 模式当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积;

  2. pull 模式实时性很低,频率不好设置;

  3. 拉取消息的间隔不好设置,太短则产生很多无效 Pull 请求的 RPC 开销,影响 MQ 整体的网络性能,太长则实时性差。

适用场景:

  1. 对于服务端生产消息数据比较大时,而消费端处理比较复杂,消费能力相对较低时,这种情况就适用 pull 模式;

  2. 对于数据实时性要求高的场景,就比较适用与 push 模式。

现在的你是否明确业务中该使用哪种模式了呢?

四、顺序消息

4.1 实现 MQ 顺序消息发送存在问题

(1)一般消息发送会采取轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue 上拉取消息,broker 之间是无感知的,这种情况发送和消费是不能保证顺序。

(2)异步方式发送消息时,发送的时候不是按着一条一条顺序发送的,保证不了消息到达 Broker 的时间也是按照发送的顺序来的。

消息发送到存储,最后到消费要经历这么多步骤,我们该如何在业务中使用顺序消息呢?让咱们来一步步拆解下吧。

4.2 实现 MQ 顺序消息关键点

既然分散到多个 broker 上无法追踪顺序,那么可以控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。在发送时设置分片路由规则,让相同 key 的消息只落到指定 queue 上,然后消费过程中对顺序消息所在的 queue 加锁,保证消息的有序性,让这个 queue 上的消息就按照 FIFO 顺序来进行消费。因此我们满足以下三个条件是否就可以呢?

1)消息顺序发送:多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到 mq 中,消息发送方法就得使用同步发送,异步发送无法保证顺序性。

//采用的同步发送方式,在一个线程内顺序发送,异步发送方式为:producer.send(msg, new SendCallback() {...})SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}
复制代码

2)消息顺序存储:MQ 的 topic 下会存在多个 queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个 queue 中。对应到 mq 中,需要使用 MessageQueueSelector 来选择要发送的 queue。即可以对业务编号设置路由规则,像根据队列数量对业务字段 hash 取余,将消息发送到一个 queue 中。

//使用"%"操作,使得订单id取余后相同的数据路由到同一个queue中,也可以自定义路由规则long index = id % mqs.size();  return mqs.get((int) index);
复制代码

3)消息顺序消费:要保证消息顺序消费,同一个 queue 就只能被一个消费者所消费,因此对 broker 中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。这里 RocketMQ 已经为我们实现好了

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();    for (MessageQueue mq : mqSet) {        if (!this.processQueueTable.containsKey(mq)) {            if (isOrder && !this.lock(mq)) {                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);                continue;            }          //....省略        }    }
复制代码

消费者重新负载,并且分配完消费队列后,需要向 mq 服务器发起消息拉取请求,代码实现在 RebalanceImpl#updateProcessQueueTableInRebalance()中,针对顺序消息的消息拉取,mq 做了以上判断,即消费客户端先向 broker 端发起对 messageQueue 的加锁请求,只有加锁成功时才创建 pullRequest 进行消息拉取,这里的 pullRequest 就是前面 pull 和 push 模式消息体,而 updateProcessQueueTableInRebalance 这个方法也是在前面消费者启动过程中有讲到过哦。

具体加锁逻辑如下:


public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }
复制代码

可以看到,就是调用 lockBatchMQ 方法发送了一个加锁请求,成功获取到消息处理队列就设为获取到锁,返回锁定成功,如果加锁成功,同一时刻只有一个线程进行消息消费。加锁失败,会延迟 1000ms 重新尝试向 broker 端申请锁定 messageQueue,锁定成功后重新提交消费请求。

怎么样,这样的加锁方式是不是很像我们平时用到的分布式锁呢?由你来设计实现你会怎么做呢?

五、消息 ack 机制

5.1 消息消费失败处理

消息被消费者消费了,那么如何保证被消费成功呢?消息消费失败会出现什么情况呢?

消息被消费,那么如何保证被消费成功呢?这里只有使用方控制,只有使用方确认成功了,才会消费成功,否则会重新投递。

RocketMQ 其实是通过 ACK 机制来对失败消息进行重试和通知的,具体流程如下所示:




消息成功与否是由使用方控制,只有使用方确认成功了,才会消费成功,否则会重新投递,Consumer 会通过监听器监听回调过来的消息,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 表示消费成功,如果消费失败,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 状态(消费重试),RocketMQ 就会默认为这条消息失败了,延迟一定时间后(默认 10s,可配置),会再次投送到 ConsumerGroup,重试次数与间隔时间关系上图所示。如果持续这样,失败到一定次数(默认 16 次),就会进入到 DLQ 死信队列,不再投递,此时可以通过监控人工来干预。

5.2 消息重投带来问题

RocketMQ 消费消息因为消息重投很大一个问题就是无法保证消息只被消费一次,因此需要开发人员在业务里面自己去处理。

六、总结

本文主要介绍了 RocketMQ 的消费者启动流程,结合官方源码和示例,一步步讲述消费者在启动和消息消费中的的工作原理及内容,并结合平时业务工作中,对我们所熟悉的顺序、push/pull 模式等进行详细分析,以及对于消息消费失败和重投带来问题去进行分析。

对于自己而言,希望通过主动学习源码方式,能够明白其中启动的原理,学习里面优秀的方案,像对于 pull/push,顺序消息这些,学习之后能够了解到 push 模式是何如做到实时拉取消息的,顺序消息是如何保证的,再就是能够联想到平时遇到这种问题该如何处理,像顺序消息在消息被消费时保持和存储的顺序一致,这里自己施加分布式锁写能不能实现等,文中也有很多引导性问题,希望能引起读者自己的思考,能够对整个消费者启动和消息消费流程有着较为直观的认知,但还有着一些技术细节由于篇幅原因没做出详细说明,也欢迎大家一起探讨交流~


参考资料:

发布于: 刚刚阅读数: 3
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
RocketMQ之消费者启动与消费流程_RocketMQ_vivo互联网技术_InfoQ写作社区