前提介绍
在 RocketMQ 中一般有两种获取消息的方式,一个是拉(pull,消费者主动去 broker 拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的 Push 操作,接下来的章节会介绍 Pull 操作方式的消费机制体系。
DefaultMQPullConsumer
DefaultMQPullConsumer 与 DefaultMQPushConsumer 相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下 DefaultMQPullConsumer 的内部原理。
总体流程执行
DefaultMQPullConsumer 使用例子
public class MQPullConsumer {
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
consumer.start();
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
try {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
}
复制代码
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
//遍历队列
for(MessageQueue mq:mqs){
try {
//获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
//第二个参数表示可以消费哪些tag的消息
//第三个参数表示从哪个位移开始消费消息
//第四个参数表示一次最大拉多少个消息
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
复制代码
DefaultMQPullConsumer 的总体流程
启动 DefaultMQPullConsumer 是通过调用 start()方法完成的
DefaultMQPullConsumer 拉取源码分析
分析下 DefaultMQPullConsumer 拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic")
复制代码
从指定 topic 中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
复制代码
核心源码分析
fetchSubscribeMessageQueues()
通过调用 fetchSubscribeMessageQueues()方法可以获取指定 topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向 nameserver 发送 GetRouteInfoRequest 请求,请求内容为 GET_ROUTEINTO_BY_TOPIC,nameserver 将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的 MessageQueue 对象。
每个 MessageQueue 对象里面记录了 topic、broker 名和读队列号。最后 fetchSubscribeMessageQueues()将 MessageQueue 对象集合返回给调用者。
向 NameServer 发送请求获取 topic 参数对应的 Broker 信息和 topic 配置信息,即 TopicRouteData 对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
// 2、遍历topicRouteData
Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
复制代码
遍历过程 TopicRouteData
遍历 TopicRouteData 对象的 QueueData 列表中每个 QueueData 对象,首先判断该 QueueData 对象是否具有读权限,若有则根据该 QueueData 对象的 readQueueNums 值,创建 readQueueNums 个 MessageQueue 对象,并构成 MessageQueue 集合;最后返回给 MessageQueue 集合
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
复制代码
consumer.fetchConsumeOffset
通过该方法获取该 MessageQueue 队列下面从 offset 位置开始的消息内容,其中 maxNums=32 即表示获取的最大消息个数,offset 为该 MessageQueue 对象的开始消费位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
复制代码
fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从 broker 获取该队列的消费位移,true 表示从 broker 获取,false 表示从本地记录获取,如果本地获取不到再从 broker 获取。这里说的从本地获取是指从 RemoteBrokerOffsetStore.offsetTable 属性中获取,该属性记录了每个队列的消费位移。当从 broker 获取位移后会更新 offsetTable。
pullBlockIfNotFound 拉取信息
rocketmq 提供了多个拉取方法,可以使用 pullBlockIfNotFound()方法也可以使用 pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound 会等待 30s 返回一个空结果,pull 是等待 10s 返回空结果。
不过 pull 方法的入参可以调整超时时间,而 pullBlockIfNotFound 则需要修改 DefaultMQPullConsumer.consumerPullTimeoutMillis 参数。不过两个方法调用的底层逻辑都是一样的,都是调用 DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下 pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
复制代码
获取该 MessageQueue 队列的消费进度来设定参数 offset 值该方法最终调用 pullSyncImpl,可以获取相关的结果数据。
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
复制代码
DefaultMQPullConsumerImpl.pullSyncImpl 的实现过程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.isRunning();
//检查入参是否合法
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
//更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
//计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
//调用PullAPIWrapper从broker拉取消息,
//pullKernelImpl方法里面构建PullMessageRequest请求对象
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,//队列
subscriptionData.getSubString(),//消息的过滤规则
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,//拉取消息的位移
maxNums,//建议broker一次性返回最大消息个数,默认是32个
sysFlag,
0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,//超时时间
CommunicationMode.SYNC,
null//回调逻辑为null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace is not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
复制代码
检查 MessageQueue 对象的 topic 是否在 RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以 consumerGroup、topic、subExpression 为参数调用 FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造 SubscriptionData 对象保存到 RebalanceImpl.subscriptionInner 变量中,其中 subExpression="*"this.subscriptionAutomatically(mq.getTopic());// 构建标志位,逻辑或运算|=int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
//以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
// 从broker中拉取消息
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
// 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
复制代码
Push 和 Pull 的操作对比
使用 DefaultMQPullConsumer 拉取消息,发送到 broker 的提交位移永远都是 0,所以 broker 无法记录有效位移,需要程序自己记录和控制提交位移。
资料参考
评论