写点什么

RocketMQ msgId 与 offsetMsgId 释疑 (实战篇),腾讯技术官发布的“神仙文档”火爆网络

  • 2022 年 4 月 22 日
  • 本文字数:2095 字

    阅读完需:约 7 分钟

System.out.println("-------------------分割线-----------------");


System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);


return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;


}


});


consumer.start();


System.out.printf("Consumer Started.%n");


}


}


执行效果如图所示:


![在这里插入图片描述](https://img-blog.csdnimg.cn/20200308202103968.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shado Java 开源项目【ali1024.coding.net/public/P7/Java/git】 w_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70)


不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的 msgId 与直接输出 msgs 中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。


[](()2、消息 ID 释疑




从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?


  • msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。

  • offsetMsgId:消息偏移 ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在 commitlog 文件的物理偏移量。

[](()2.1 msgId 即全局唯一 ID 构建规则


从这张图可以看出,msgId 确实是客户端生成的,接下来我们详细分析一下其生成算法。


MessageClientIDSetter#createUniqID


public static String createUniqID() {


StringBuilder sb = new StringBuilder(LEN * 2);


sb.append(FIX_STRING); // @1


sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2


return sb.toString();


}


一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。

[](()2.1.1 FIX_STRING

MessageClientIDSetter 静态代码块


static {


byte[] ip;


try {


ip = UtilAll.getIP();


} catch (Exception e) {


ip = createFakeIP();


}


LEN = ip.length + 2 + 4 + 4 + 2;


ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);


tempBuffer.position(0);


tempBuffer.put(ip);


tempBuffer.position(ip.length);


tempBuffer.putInt(UtilAll.getPid());


tempBuffer.position(ip.length + 2);


tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());


FIX_STRIN 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 G = UtilAll.bytes2string(tempBuffer.array());


setStartTime(System.currentTimeMillis());


COUNTER = new AtomicInteger(0);


}


从这里可以看出 FIX_STRING 的主要由:客户端的 IP、进程 ID、加载 MessageClientIDSetter 的类加载器的 hashcode。

[](()2.1.2 唯一性算法

msgId 的唯一性算法由 MessageClientIDSetter 的 createUniqIDBuffer 方法实现。


private static byte[] createUniqIDBuffer() {


ByteBuffer buffer = ByteBuffer.allocate(4 + 2);


long current = System.currentTimeMillis();


if (current >= nextStartTime) {


setStartTime(current);


}


buffer.position(0);


buffer.putInt((int) (System.currentTimeMillis() - startTime));


buffer.putShort((short) COUNTER.getAndIncrement());


return buffer.array();


}


可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。

[](()2.2 offsetMsgId 构建规则


在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个 id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:


MessageDecoder#createMessageId


public static String createMessageId(final ByteBuffer input ,


final ByteBuffer addr, final long offset) {


input.flip();


int msgIDLength = addr.limit() == 8 ? 16 : 28;


input.limit(msgIDLength);


input.put(addr);


input.putLong(offset);


return UtilAll.bytes2string(input.array());


}


首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:


  • ByteBuffer input


用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)


  • ByteBuffer addr


当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。


  • long offset


消息的物理偏移量。


即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。


温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可用不必知道该消息所属的 topic 信息即可查询该条消息的内容。

最后

经过日积月累, 以下是小编归纳整理的深入了解 Java 虚拟机文档,希望可以帮助大家过关斩将顺利通过面试。由于整个文档比较全面,内容比较多,篇幅不允许,下面以截图方式展示 。









由于篇幅限制,文档的详解资料太全面,细节内容太多,所以只把部分知识点截图出来粗略的介绍,每个小节点里面都有更细化的内容!

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ msgId与offsetMsgId释疑(实战篇),腾讯技术官发布的“神仙文档”火爆网络_Java_爱好编程进阶_InfoQ写作社区