写点什么

RocketMQ msgId 与 offsetMsgId 释疑 (实战篇)

  • 2021 年 11 月 11 日
  • 本文字数:2773 字

    阅读完需:约 9 分钟

执行效果如图所示:



即消息发送会返回 msgId 与 offsetMsgId。

1.2 从消息消费看消息 ID

package org.apache.


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


rocketmq.example.quickstart;


import java.util.List;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;


import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;


import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;


import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;


import org.apache.rocketmq.client.exception.MQClientException;


import org.apache.rocketmq.common.consumer.ConsumeFromWhere;


import org.apache.rocketmq.common.message.MessageExt;


public class Consumer {


public static void main(String[] args) throws InterruptedException, MQClientException {


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");


consumer.setNamesrvAddr("127.0.0.1:9876");


consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


consumer.subscribe("TestTopic", "*");


consumer.registerMessageListener(new MessageListenerConcurrently() {


public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,


ConsumeConcurrentlyContext context) {


System.out.println("MessageExt msg.getMsgId():" + msgs.get(0).getMsgId());


System.out.println("-------------------分割线-----------------");


System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);


return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;


}


});


consumer.start();


System.out.printf("Consumer Started.%n");


}


}


执行效果如图所示:



不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的 msgId 与直接输出 msgs 中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。


2、消息 ID 释疑




从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?


  • msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。

  • offsetMsgId:消息偏移 ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在 commitlog 文件的物理偏移量。

2.1 msgId 即全局唯一 ID 构建规则


从这张图可以看出,msgId 确实是客户端生成的,接下来我们详细分析一下其生成算法。


MessageClientIDSetter#createUniqID


public static String createUniqID() {


StringBuilder sb = new StringBuilder(LEN * 2);


sb.append(FIX_STRING); // @1


sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2


return sb.toString();


}


一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。

2.1.1 FIX_STRING

MessageClientIDSetter 静态代码块


static {


byte[] ip;


try {


ip = UtilAll.getIP();


} catch (Exception e) {


ip = createFakeIP();


}


LEN = ip.length + 2 + 4 + 4 + 2;


ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);


tempBuffer.position(0);


tempBuffer.put(ip);


tempBuffer.position(ip.length);


tempBuffer.putInt(UtilAll.getPid());


tempBuffer.position(ip.length + 2);


tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());


FIX_STRING = UtilAll.bytes2string(tempBuffer.array());


setStartTime(System.currentTimeMillis());


COUNTER = new AtomicInteger(0);


}


从这里可以看出 FIX_STRING 的主要由:客户端的 IP、进程 ID、加载 MessageClientIDSetter 的类加载器的 hashcode。

2.1.2 唯一性算法

msgId 的唯一性算法由 MessageClientIDSetter 的 createUniqIDBuffer 方法实现。


private static byte[] createUniqIDBuffer() {


ByteBuffer buffer = ByteBuffer.allocate(4 + 2);


long current = System.currentTimeMillis();


if (current >= nextStartTime) {


setStartTime(current);


}


buffer.position(0);


buffer.putInt((int) (System.currentTimeMillis() - startTime));


buffer.putShort((short) COUNTER.getAndIncrement());


return buffer.array();


}


可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。

2.2 offsetMsgId 构建规则


在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个 id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:


MessageDecoder#createMessageId


public static String createMessageId(final ByteBuffer input ,


final ByteBuffer addr, final long offset) {


input.flip();


int msgIDLength = addr.limit() == 8 ? 16 : 28;


input.limit(msgIDLength);


input.put(addr);


input.putLong(offset);


return UtilAll.bytes2string(input.array());


}


首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:


  • ByteBuffer input


用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)


  • ByteBuffer addr


当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。


  • long offset


消息的物理偏移量。


即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。


温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可用不必知道该消息所属的 topic 信息即可查询该条消息的内容。

2.3 消息发送与消息消费返回的消息 ID 信息

消息发送时会在 SendSesult 中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。


在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性 ID),因为该全局性 ID 非常方便实现消费端的幂等。


在本文的 1.2 节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?



在客户端返回的 msg 信息,其最终返回的对象是 MessageClientExt ,继承自 MessageExt。


那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。


@Override


public String getMsgId() {


String uniqID = MessageClientIDSetter.getUniqID(this);


if (uniqID == null) {


return this.getOffsetMsgId();


} else {


return uniqID;


}


}

评论

发布
暂无评论
RocketMQ msgId与offsetMsgId释疑(实战篇)