写点什么

为什么 kafka 性能下降这么快,我用 RocketMQ 的时候不会这样子

作者:Java-fenn
  • 2022 年 9 月 25 日
    湖南
  • 本文字数:4584 字

    阅读完需:约 15 分钟

​Rocketmq 和 kafka 这两个消息队列大家应该都比较熟悉吧,哪怕不是很熟悉,应该也听说过的吧,你别告诉我,作为一个资深的程序员,你没听过这两门技术。

我之前使用这两个消息队列的时候就遇到一个很奇怪的问题,就是在 kafka 里面弄了比较多的 topic,性能下降的速度贼快,不知道大家遇到过没,而同样的场景切换到消息队列 rocketmq 中,下降速度却没有那么快。

不熟悉这俩消息队列结构的朋友,一听这个肯定还是不太清楚的,今天我来给大家分析分析这其中的原因,给大家解惑。

rocketmq 的结构



NameServer:主要是对元数据的管理,包括 Topic 和路由信息的管理,底层由 netty 实现,是一个提供路由管理、路由注册和发现的无状态节点,类似于 ZooKeeper

Broker:消息中转站,负责收发消息、持久化消息

Producer:消息的生产者,一般由业务系统来产生消息供消费者消费

Consumer:消息的消费者,一般由业务系统来异步消费消息

在 RocketMQ 中的每一条消息,都有一个 Topic,用来区分不同的消息。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。

在 Topic 中有分为了多个 Queue,这其实是我们发送/读取消息通道的最小单位,我们发送消息都需要指定某个写入某个 Queue,拉取消息的时候也需要指定拉取某个 Queue,所以我们的顺序消息可以基于我们的 Queue 维度保持队列有序,如果想做到全局有序那么需要将 Queue 大小设置为 1,这样所有的数据都会在 Queue 中有序。



我们同一组 Consumer 会根据一些策略来选 Queue,常见的比如平均分配或者一致性 Hash 分配。

要注意的是当 Consumer 出现下线或者上线的时候,这里需要做重平衡,也就是 Rebalance,RocketMQ 的重平衡机制如下:

定时拉取 broker,topic 的最新信息,每隔 20s 做重平衡,随机选取当前 Topic 的一个主 Broker,这里要注意的是不是每次重平衡所有主 Broker 都会被选中,因为会存在一个 Broker 再多个 Broker 的情况。

获取当前 Broker,当前 ConsumerGroup 的所有机器 ID。然后进行策略分配。

由于重平衡是定时做的,所以这里有可能会出现某个 Queue 同时被两个 Consumer 消费,所以会出现消息重复投递。

Queue 读写数量不一致

在 RocketMQ 中 Queue 被分为读和写两种,在最开始接触 RocketMQ 的时候一直以为读写队列数量配置不一致不会出现什么问题的,比如当消费者机器很多的时候我们配置很多读的队列,但是实际过程中发现会出现消息无法消费和根本没有消息消费的情况。

当写的队列数量大于读的队列的数量,当大于读队列这部分 ID 的写队列的数据会无法消费,因为不会将其分配给消费者。

当读的队列数量大于写的队列数量,那么多的队列数量就不会有消息被投递进来。



rocketmq 中的存储机制

RocketMQ 凭借其强大的存储能力和强大的消息索引能力,以及各种类型消息和消息的特性脱颖而出,于是乎,我们这些有梦想的程序员学习 RocketMQ 的存储原理也变得尤为重要

而要说起这个存储原理,则不得不说的就是 RocketMQ 的消息存储文件 commitLog 文件,消费方则是凭借着巧妙的设计 Consumerqueue 文件来进行高性能并且不混乱的消费,还有 RocketMQ 的强大的支持消息索引的特性,靠的就是 indexfile 索引文件

我们这篇文章就从这 commitLog、Consumerqueue、indexfile 这三个神秘的文件说起,搞懂这三个文件,RocketMQ 的核心就被你掏空了

先上个图,写入 commitLog 文件时 commitLog 和 Consumerqueue、indexfile 文件三者的关系



commitLog

RocketMQ 中的消息存储文件放在 ${ROCKET_HOME}/store 目录下,当生产者发送消息时,broker 会将消息存储到 Commit 文件夹下,文件夹下面会有一个 commitLog 文件,但是并不是意味着这个文件叫这个,文件命名是根据消息的偏移量来决定的。

文件有自己的生成规则,每个 commitLog 文件的大小是 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。

commitLog 文件的最大的一个特点就是消息的顺序写入,随机读写,所有的 topic 的消息都存储到 commitLog 文件中,顺序写入可以充分的利用磁盘顺序减少了 IO 争用数据存储的性能,kafka 也是通过硬盘顺序存盘的。

大家都常说硬盘的速度比内存慢,其实这句话也是有歧义的,当硬盘顺序写入和读取的时候,速度不比内存慢,甚至比内存速度快,这种存储方式就好比数组,我们如果知道数组的下标,则可以直接通过下标计算出位置,找到内存地址,众所周知,数组的读取是很快的,但是数组的缺点在于插入数据比较慢,因为如果在中间插入数据需要将后面的数据往后移动。

而对于数组来说,如果我们只会顺序的往后添加,数组的速度也是很快的,因为数组没有后续的数据的移动,这一操作很耗时。

回到 RocketMQ 中的 commitLog 文件,也是同样的道理,顺序的写入文件也就不需要太多的去考虑写入的位置,直接找到文件往后放就可以了,而取数据的时候,也是和数组一样,我们可以通过文件的大小去精准的定位到哪一个文件,然后再精准的定位到文件的位置。



consumerqueue 文件

RocketMQ 是分为多个 topic,消息所属主题,属于消息类型,每一个 topic 有多个 queue,每个 queue 放着不同的消息,在同一个消费者组下的消费者,可以同时消费同一个 topic 下的不同 queue 队列的消息。不同消费者下的消费者,可以同时消费同一个 topic 下的相同的队列的消息。而同一个消费者组下的消费者,不可以同时消费不同 topic 下的消息。

而每个 topic 下的 queue 队列都会对应一个 Consumerqueue 文件,这个存储的就是对应的 commitLog 文件中的索引位置,而不用存储真实的消息。

consumequeue 存放在 store 文件里面,里面的 consumequeue 文件里面按照 topic 排放,然后每个 topic 默认 4 个队列,里面存放的 consumequeue 文件。

ConsumeQueue 中并不需要存储消息的内容,而存储的是消息在 CommitLog 中的 offset。也就是说 ConsumeQueue 其实是 CommitLog 的一个索引文件。

consumequeue 是定长结构,每个记录固定大小 20 个字节,单个 consumequeue 文件默认包含 30w 个条目,所以单个文件大小大概 6M 左右。



很显然,Consumer 消费消息的时候,要读 2 次:先读 ConsumeQueue 得到 offset,再通过 offset 找到 CommitLog 对应的消息内容。

IndexFile

RocketMQ 还支持通过 MessageID 或者 MessageKey 来查询消息,使用 ID 查询时,因为 ID 就是用 broker+offset 生成的(这里 msgId 指的是服务端的),所以很容易就找到对应的 commitLog 文件来读取消息。

对于用 MessageKey 来查询消息,MessageStore 通过构建一个 index 来提高读取速度。



indexfile 文件存储在 store 目录下的 index 文件里面,里面存放的是消息的 hashcode 和 index 内容,文件由一个文件头组成:长 40 字节。500w 个 hashslot,每个 4 字节。2000w 个 index 条目,每个 20 字节。

所以这里我们可以估算每个 indexfile 的大小为:40+500w4+2000w20 个字节,大约 400M 左右。

每放入一个新消息的 index 进来,首先会取 MessageKey 的 HashCode,然后用 Hashcode 对 slot 的总数进行取模,决定该消息 key 的位置,slot 的总数默认是 500W 个。

只要取 hash 就必然面临着 hash 冲突的问题,indexfile 也是采用链表结构来解决 hash 冲突,这一点和 HashMap 一样的,不过这个不存在红黑树转换这一说,个人猜测这个的冲突数量也达不到很高的级别,所以进行这方面的设计也没啥必要,甚至变成了强行增加 indexfile 的文件结构难度。

还有,在 indexfile 中的 slot 中放的是最新的 index 的指针,因为一般查询的时候大概率是优先查询最近的消息。

每个 slot 中放的指针值是索引在 indexfile 中的偏移量,也就是后面 index 的位置,而 index 中存放的就是该消息在 commitlog 文件中的 offset,每个 index 的大小是 20 字节,所以根据当前索引是这个文件中的第几个偏移量,也就很容易定位到索引的位置,根据前面的固定大小可以很快把真实坐标算出来,以此类推,形成一个链表的结构。

kafka 的结构



Broker:消息中间件处理节点(服务器),一个节点就是一个 broker,一个 Kafka 集群由一个或多个 broker 组成。

Topic:Kafka 对消息进行归类,发送到集群的每一条消息都要指定一个 topic。

Partition:物理上的概念,每个 topic 包含一个或多个 partition,一个 partition 对应一个文件夹,这个文件夹下存储 partition 的数据和索引文件,每个 partition 内部是有序的。

Producer:生产者,负责发布消息到 broker。

Consumer:消费者,从 broker 读取消息。

ConsumerGroup:每个 consumer 属于一个特定的 consumer group,可为每个 consumer 指定 group name,若不指定,则属于默认的 group,一条消息可以发送到不同的 consumer group,但一个 consumer group 中只能有一个 consumer 能消费这条消息。

kafka 存储机制



我们的生产者会决定发送到哪个 Partition,如果没有 Key 值则进行轮询发送。

如果有 Key 值,对 Key 值进行 Hash,然后对分区数量取余,保证了同一个 Key 值的会被路由到同一个分区。(所有系统的 partition 都是同一个路数)。

总所周知,topic 在物理层面以 partition 为分组,一个 topic 可以分成若干个 partition,那么 topic 以及 partition 又是怎么存储的呢?

其实 partition 还可以细分为 logSegment,一个 partition 物理上由多个 logSegment 组成,那么这些 segment 又是什么呢?

LogSegment 文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为 Segment 索引文件和数据文件。

这两个文件的命令规则为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:

第一个segment00000000000000000000.index 00000000000000000000.log    第二个segment,文件命名以第一个segment的最后一条消息的offset组成00000000000000170410.index 00000000000000170410.log 第三个segment,文件命名以上一个segment的最后一条消息的offset组成00000000000000239430.index 00000000000000239430.log
复制代码

“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

kafka 和 rocketmq 的比较

RocketMQ 和 Kafka 的存储核心设计有很大的不同,所以其在写入性能方面也有很大的差别,这是 16 年阿里中间件团队对 RocketMQ 和 Kafka 不同 Topic 下做的性能测试:



从图上可以看出:

Kafka 在 Topic 数量由 64 增长到 256 时,吞吐量下降了 98.37%。

RocketMQ 在 Topic 数量由 64 增长到 256 时,吞吐量只下降了 16%。

这是为什么呢?



kafka 一个 topic 下面的所有消息都是以 partition 的方式分布式的存储在多个节点上。同时在 kafka 的机器上,每个 Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段。

所以如果 Topic 很多的时候 Kafka 虽然写文件是顺序写,但实际上文件过多,会造成磁盘 IO 竞争非常激烈。

那 RocketMQ 为什么在多 Topic 的情况下,依然还能很好的保持较多的吞吐量呢?我们首先来看一下 RocketMQ 中比较关键的文件:



rocketmq 中的消息主体数据并没有像 Kafka 一样写入多个文件,而是写入一个文件,这样我们的写入 IO 竞争就非常小,可以在很多 Topic 的时候依然保持很高的吞吐量。

有人可能说这里的 ConsumeQueue 写是在不停的写入呢,并且 ConsumeQueue 是以 Queue 维度来创建文件,那么文件数量依然很多,在这里 ConsumeQueue 的写入的数据量很小,每条消息只有 20 个字节,30W 条数据也才 6M 左右,所以其实对我们的影响相对 Kafka 的 Topic 之间影响是要小很多的。

再顺便提一嘴,一个 topic 分了一万个 partition 和一万个 topic 每个 topic 都是单 partition 对于 kafka 的负载是一样的。

用户头像

Java-fenn

关注

需要Java资料或者咨询可加我v : Jimbye 2022.08.16 加入

还未添加个人简介

评论

发布
暂无评论
为什么kafka性能下降这么快,我用RocketMQ的时候不会这样子_Java_Java-fenn_InfoQ写作社区