Kafka008——浅谈 Broker 的存储架构
写在前面
聚焦于 Broker 存储,对 Kafka 存储相关的概念、文件结构、文件内容、文件管理与存储技术等方面知识。
存储架构
Kafka 存储架构的基本概念:Topic+Partition+Replica+LogSegment+Index:
Topic:Kafka 的逻辑主题概念。用于 Producer 与 Consumer 生成消费消息时的约定使用。
Partition:Topic 内消息存储的主要载体。Kafka 通过 Partition 将一个 Topic 内的所有消息分散到不同的 Partition 中,Partition 可以水平拓展,便可以间接提升对应 Topic 消息的承载能力。
Replica:Kafka 保证消息存储分布式高可用的重要保证。Replica 通过对 Partition 内存储的消息数据做冗余拷贝,并分散到不同 Broker 中,提升了 Kafka 存储消息数据的可靠性。Replica 分为 Leader 与 Follower,Leader 负责承载所有的读写请求,Follower 负责同步 Leader 消息数据。
LogSegment:实际消息存储在 Partition 中的最小单元。通过对消息数据进一步拆分,有点分而治之的感觉,避免了单一过大文件的维护与存储开销。拆分的小日志文件也能与消息的批提交更好的结合。通过通过维护稀疏的索引文件,也能在不维护过大索引的前提下,提升数据查询效率。
.log 与.index:Segment 以文件存储在 Broker 中,每个 Segemnt 内维护日志文件与索引文件。索引会包含:.index 与.timeindex,日志包含:.log。Segment 还会维护一些其他文件,如:.snapshot(快照索引文件)
存储文件
Kafka 日志文件存放路径由
log.dir
参数配置。日志文件按照
Topic-Partition
的方式组织日志文件。每个Topic-Partition
对应一个文件夹,文件夹名称就为:Topic-Partition
。每个
Topic-Partition
文件夹中包含多个日志分段文件(**.log
)与索引文件(**.index
与**.timeindex
)。日志分段文件与索引文件名称都相同,且以该分段范围内的第一条消息的 Offset 命名。
文件内容
了解了 Kafka 存储数据的目录格式,明白了数据是怎么存放的之后,再了解一下数据存放的内容是什么?
索引
索引主要有两种:偏移量索引(**.index
)与时间戳索引(**.timeindex
)。
偏移量索引文件:保存消息偏移量->物理地址的映射关系
时间戳索引文件:保存时间戳->偏移量的映射关系。
Kafka 以稀疏索引的方式索引文件。对于稀疏索引我的理解是:Kafka 没有维护全量可以索引到每条消息的索引数据结构。而是维护了部分索引,类似跳表的跳跃查询。在查询时,通过稀疏索引定位目标消息所在的大致块区,然后顺序扫描块区,最终获取到目标消息。通过稀疏索引,Kafka 提供了具有一定效率的索引查询能力,同时也避免了维护全量消息的索引计算成本。
偏移量索引
每个索引项大小 8 字节,构成如下:
RelativeOffset:相对偏移量,占用:0~3 字节。RelativeOffset 表示该消息距离 BaseOffset 的偏移量,BaseOffset 为索引、日志文件开头的名称对应的 Offset。
Position:物理地址,占用:4~7 字节。表示消息在分段日志文件的物理位置。
使用偏移量索引查找目标消息是方法是:二分查找+顺序查询,其过程如下:
查询场景给定为:查找到目标 Offset(设为 TargetOffset)的消息。
二分查找 Segment:
日志多个 Segment(记为:S0,S1,...,Sn)顺序存放。
每个 Segment 文件可根据文件名确定这个块区内消息的起始 Offset(记为:BaseOffset(Sn))、
可以通过二分查找定位到某个 SegmentI 文件,它满足:$$
二分查找 Position:
计算 TargetRelativeOffset:将 2 中查找的 Si 的 BaseOffset 记为 BaseOffset。则:$$
通过二分查找到索引中所有相对偏移量小于 RelativeOffset 的记录,而后取记录里的最大值,即为离 RelativeOffset 最近的消息,再根据索引映射条件,获取到对应的 Position。
基于 Position 顺序查找:从 Position 开始,顺序查找出对应偏移量的消息。
时间戳索引
每个索引项大小为 12 字节,也分为两部分:
TimeStamp:当前日志分段的最大时间戳。
RelativeOffset:时间戳对应消息的相对偏移量。
根据时间戳查找对应消息的过程如下:
查询场景给定为:查找到目标 TimeStamp(设为 TargetTimeStamp)的消息。
顺序查找 Segment:根据 TargetTimeStamp 到每个日志分段文件中最大的时间戳逐一比较,定位到时间戳索引文件。因为时间戳索引文件也是使用了 baseOffset 命名,所以没办法直接通过二分法快速定位到。
二分查找 Posiition:类似偏移量索引中二分查找 Position 的方法,只是查找目标由 RelativeOffset 改为 TimeStamp。也能通过二分查找定位到所有 TimeStamp 小于 TargetTimeStamp 的记录里,最大的那一条,取到对应的 Position。
基于 Position 顺序查找:从 Position 开始,顺序查找出对应偏移量的消息。
Log 日志格式
Kafka 的日志存储是按照预设的格式来完成的。日志存储格式也变更了多版本。这里做一下简单描述。
V0
Message
Header
offset:8B,Partition 分区中的偏移量。
message size:4B,消息的大小。
Record
crc32:4B,对 Record(magic~value)范围内做 CRC 计算的值。
magic:1B,消息格式版本号,v0 版本值为 0。
attributes:1B,消息类型。0、1、2 位表示压缩类别(0:None;1:GZip;2:Snappy;3:LZ4)。其余位保留
key length:4B,消息 key 长度;-1 表示没有设置 key。
key:没有设置 key 则没有该字段;有设置 key 则存放 key 的内容,大小等于 key length。
value length:消息体长度;-1 表示消息为空
value:消息体
V1
V1 在 V0 的格式基础之上,在 magic 和 attributes 之间新增了一个 timestamp 字段。同时 attributes 字段第四位用于表示 timestamp 字段类型。具体如下:
Message
Header
offset:8B,Partition 分区中的偏移量。
message size:4B,消息的大小。
Record
crc32:4B,对 Record(magic~value)范围内做 CRC 计算的值。
magic:1B,消息格式版本号,v1 版本值为 1。
timestamp:8B,时间戳,用于日志保存与切分的策略,以及计算消息端到端延迟等功能
attributes:1B,消息类型。0、1、2 位表示压缩类别(0:None;1:GZip;2:Snappy;3:LZ4),3 位表示 timestamp 类型(0:Creatime;1:LogAppendTime)。其余位保留
key length:4B,消息 key 长度;-1 表示没有设置 key。
key:没有设置 key 则没有该字段;有设置 key 则存放 key 的内容,大小等于 key length。
value length:消息体长度;-1 表示消息为空
value:消息体
因为 V0 与 V1 结构基本相同(只是 V1 新增了部分字段)。所以这里统一说明一下 V0 与 V1 日志格式的缺点(可以感受一下消息格式设计的奥妙):
冗余的 CRC 校验:即使是批次发送消息,每条消息也需要单独保存 CRC。
空间使用率低:无论 key 或 value 是否存在,都需要一个固定大小 4 字节去保存它们的长度信息,当消息足够多时,会浪费非常多的存储空间。
消息长度(Record 长度)没有保存:需要实时计算得出每条消息的总大小,效率低下
只保存最新消息位移。
V2
V2 版本针对 V0 与 V1 版本的缺点做了针对性的优化。主要改动点如下:
支持了批量消息
批量消息头部信息统一存放,批量各消息体信息单独存放。
使用增量形式维护时间戳与 offset 偏移量
使用可变长度提升存储空间利用率
具体格式如下(是按照 RecordBatch->Records->Headers 的层级结构组织):
RecordBatch:消息集,内部包含同一集合内的多条消息
first offset:8B,当前 RecordBatch 的起始偏移量
length:4B,计算从 partition leader epoch 到末尾的长度。
partition leader epoch:4B,分区 leader 的版本号
magic:1B,消息格式版本号,v2 版本是 2。
crc32:4B,crc32 校验值。
attributes:2B,消息属性,这里占用 2 个字节。0、1、2 位表示压缩格式,4 位表示时间戳类型,5 位表示此 RecordBatch 是否在事务中,6 位表示是否为控制消息。7 位保留。
last offset delta:4B,最大位移增量。RecordBatch 中最后一个 Record 的 offset 与 first offset 的差值。主要用于 broker 确保 RecordBatch 中 Recoord 组装的正确性。
first timestamp:8B,起始时间戳,RecordBatch 中第一条 Record 的时间戳。
max timestamp:8B,最大时间戳,RecordBatch 中最大的时间戳。一般情况是最后一条 Record 的时间戳。
producer id:用来支持事务和幂等。暂不解释。
producer epoch:用来支持事务和幂等。暂不解释。
first sequeue:用来支持事务和幂等。暂不解释。
records count:RecordBatch 中 record 的个数。
records:RecordBatch 中消息合集。
length:varint,消息总长度
attributes:1B,保留位,供未来扩展
timestamp delta:varlong,时间戳增量。
offset delta:varint,偏移量增量。保存与 RecordBatch 起始偏移量的差值。
key length:varint,消息 key 长度。
key value:消息 key 的值。
value length:varint,消息体的长度。
value:消息体的值。
header count:varint,消息头个数。
headers:消息头。发送消息时指定的 Header 信息,用来支持应用级别的扩展。
header key length:varint,消息头 key 的长度。
header key:消息头 key 的值。
header value length:varint,消息头值的长度。
header value:消息头的值。
可再从小到大反过来看一下 V2 版本的日志格式:
Header:记录发送消息的 Header 信息。
记录了 Header key 与 value 的长度与内容信息。
Header key 与 value 的长度信息都采用可变长度以提升空间利用率
Records:主要消息信息存储的内容,可以与 V0、V1 版本对比:
调整了 crc32 字段位置到外层 RecordBatch。避免了冗余的 crc 校验,提升计算效率
保存了 Record 的长度,避免了实时获取消息长度的计算开销,并提升空间利用率。
增加了增量数据,包括时间戳,偏移量。
使用可变长度保存
RecordBatch:新增的消息集结构。
留存 crc32,做消息集的统一校验
增加了 produceid、produce epoch,first sequeue 用于支持事务与幂等
变长字段
变长字段,其实是一种压缩整数的算法。在 kafka 中,消息 value 的长度是一个变化的动态值,只考虑变化范围的最小与最大,而后用固定长度的字节去表达实际的长度值,其实会导致浪费。因为长度的实际值分布是不均匀的。可能较小的长度出现的次数更多,较大的长度出现的次数更少。那这时如果能动态的针对较小的数用较小的字节存储,而较大的数用较大的字节存储,综合考虑二者出现的次数比例,实际会节省存储空间与传输网络带宽。
Kafka 的变长字段实现借鉴了 Protobuf 的实现,他们的变长字段是基于连续位标识算法的。即:使用每个字节的第一位来标识是否需要继续向后读。每个字节低 7 位用于实际的编码。
他有以下特点:
数值越小,占用的字节数量也越少。
变长字段可使用 1~10 个字节去标识无符号 64 位整型数字。
MSB(Most Significant Bit):表示解码时 MSB 后面的字是否需要继续读取,以共同表达一个数。
变长字段里每个字节的最高位都是 MSB 位。变长字段最后一个字节的 MSB 设置位 0,其余设置位 1.
一个字节里除了 MSB 外剩下 7 位用于存储数据本身。
变长字段采用小端字节序,低位字节放在最前面。
变长字段一个字节只能表示 128 个数,因为最高位固定位 MSB。
变长字段的编码与解码
这里不作详细编码解码的算法实现分析。主要通过例子来加深变长字段的理解。
编码的主要流程是:
对整数做 7bit 的 10 进制->2 进制的转换。整数小于 127 的会得到一个分组,而大于 127 的会得到多个分组
对于小于 127 的一个分组,就可以将 MSB 设置为 0,表示这一个字节就可以完成解码,后面的字节不需要(当然目前也只有一个字节)。
对于大于 127 的多个分组,需要按照小端字节序调整分组出现的顺序,然后对前面分组添加 MSB 为 1,最后一个分组添加 MSB 为 0。
举例:数字25
为例:
转换为 2 进制为:
00011001
按 7bit 划分为:
0011001
,只有一个分组最高位 MSB 设置为 0,则数字
25
的变长字段编码为:00011001
。
举例:数字225
为例:
转换为 2 进制为:
11100001
,按 7bit 划分为:
0000001
1100001
,按小端字节序调整顺序:
1100001``0000001
从前往后设置 MSB,注意除最后一个设置为 0 外,其他设置为 1,表示解码时需要读取到哪个字节:
11100001``00000001
。
解码的主要流程是:
先读取一个字节,如果字节的最高位为 1,则继续读;
重复 1,直到读取到某个字节的最高位为 0,则停止。
移除 1、2 读取的所有字节的最高位,并逆转 1、2 读取所有字节的顺序。
重新组合 3 中字节,而后按 2 进制解码即可获取到原始整型数。
举例:以变长码10010110 00000001
为例。
先读取
10010110
,继续读取00000001
,停止。移除 1 中所有字节的最高位,得到:
0010110
0000001
,逆转顺序,得到:
0000001
0010110
,重新组合,得到:
00000010010110
,即10010110
,按 2 进制解码,得到:150
。
文件的新增、清理
新增日志
多个日志分段文件因为是按照时间顺序从前往后写入的,所以当有新的消息需要写入日志分段文件中时,只能将消息写入最后一个日志分段文件,这个日志分段文件也称为:ActiveLogSegment,即活跃日志分段文件。
接下来考虑当需要将消息写入日志分段文件中时,Kafka Broker 会做什么:
将消息追加到当前 Topic-Partition 的 ActiveLogSegment 文件中,并为消息分配一个 Partition 全局唯一的 Offset。
当 ActiveLogSegment 满足一定条件时,Kafka 会关闭该分段文件,并新建一个新的分段文件,后续新增的消息便改为在新增的分段文件中追加消息。
触发新建 ActiveLogSegment 文件的情况一共有以下四种:
LogSegment 文件的大小达到
log.segment.bytes
指定的阈值大小(默认为 1G)LogSegment 文件的最大时间戳和最小时间戳的差值达到
log.roll.ms
或log.roll.hours
指定的阈值大小(默认 7 天)LogSegment 文件的最大偏移量到最小偏移量的差值达到
log.roll.bytes
指定的阈值大小(默认-1,不限制)LogSegment 文件的索引文件的大小达到
log.index.size.max.bytes
指定的阈值大小(默认 10MB)
清理日志
Kafka 控制日志占用磁盘空间大小的方式主要有两种:
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段(LogSegment)。可通过设置
log.cleanup.policy
为delete
来开启日志删除的日志清理机制日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。可通过设置
log.cleanup.policy
为compact
,且设置log.cleaner.enable
为true
,来开启日志压缩的日志清理机制
如果想要同时支持两种清理策略, 可以直接将 log.cleanup.policy
设置为delete,compact
。
日志删除
kafka 会通过一个后台线程定期(默认 5 分钟,通过 Broker 参数log.retention.check.interval.ms
)检查是否有需要删除的日志分段文件,并将其后缀改为 delete。然后另一个后台线程会定期(默认 15 秒)执行实际的删除操作。
kafka 的日志删除策略有三种:
基于时间:指 kafka 会删除超过指定时间间隔(默认 7 天)的日志分段文件。可以通过
log.retention.hours
、log.retention.minutes
或log.retention.ms
来设置时间间隔,其中 ms 优先级最高,minutes 次之,hours 优先级最低.注意:这里用于计算间隔的时间取值逻辑为:首先要查询该 LogSegment 文件对应的时间戳索引文件,查找该时间戳索引文件的最后一条索引数据,如果时间戳值大于 0,则取值,否则会使用最近修改时间(lastModifiedTime)。
基于大小:指 kafka 会删除超过指定大小阈值(默认-1,表示不限制)的日志分段文件。可以通过
log.retention.bytes
来设置大小阈值。注意:
log.retention.bytes
设置的是 Log 中所有日志文件的大小,而不是单个日志段的大小。单个日志段可以通过参数log.segment.bytes
来设置,默认大小为 1G。基于偏移量:指 kafka 会删除超过指定偏移量范围(默认-1,表示不限制)的日志分段文件。可以通过
log.retention.bytes.per.partition
来设置偏移量范围,该参数是针对每个 partition 的起始偏移量,而不是单个日志分段文件的偏移量。
具体的删除操作这里不赘述,感兴趣的同学可查阅参考资料。
日志压缩
日志压缩是对于有相同 key 的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 相应的日志清理功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
日志压缩的目的是为了保证每个 key 都至少保存有最近的一条记录,以便于恢复消费者的最新状态。
日志压缩会排除 ActiveLogSegment,这样可以不考虑新增的追加写对压缩过程的影响。
日志压缩的过程如下:
kafka 会将旧的日志分段文件复制到新的日志分段上面,并删除相同 key 的旧记录,只保留最新的记录。
kafka 会按照“清理点”分为日志头部和尾部,清理点之前的日志分段是待删除日志(墓碑日志),清理点之后的日志分段是活跃日志。
kafka 会根据
log.cleaner.delete.retention.ms
参数(默认 24 小时)来决定是否保留墓碑日志分段,超过该时间的墓碑日志分段会被删除。kafka 会根据
log.cleaner.min.compaction.lag.ms
参数(默认 0)来决定是否压缩活跃日志分段,只有超过该时间的活跃日志分段才会被压缩。
存储技术
Kafka 消息存储在 Broker 本地磁盘中。而将消息数据放在磁盘中存在的瓶颈就是磁盘的写入与读取,Kafka 为了保证消息读写性能,在存储技术上使用了下面这些。
顺序追加
Kafka 通过设计存储架构,将消息的写入对应到日志分段文件写入,而且保证同一个 Topic-Partition 维度下,只有一个 ActiveLogSegment,即最新的日志分段文件,保证消息写入是以日志追加的方式完成,从而让磁盘以顺序写入为主,避免随机写入(顺序写入是随机写入性能的 6000 倍)。
PageCache
简单描述一下操作系统中进程如何从磁盘中获取数据的。
读:
操作系统首先查看 PageCache 中是否有待读取的数据页,如果有,则命中 PageCache,直接返回给进程,从而避免对磁盘的数据访问
如果没有操作系统则会产生缺页中断,向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程
写:
操作系统先检查数据页是否在 PageCache 中,如果存在,则将新数据直接写入 PageCache 中
如果不存在,则新建 PageCache,而后将数据写入新建的 PageCache 中
操作系统会异步将脏页(即写入了新数据的 PageCache)刷入磁盘中,从而保证数据的最终一致性。
同时操作系统还会利用局部性原理,即某一页数据被访问后,那么他关联的其他页数据也有较大可能性在未来被访问到。因此:
在读的时候,会通过预读,将目标数据页相邻的其他磁盘数据页也一并从磁盘加载到内存中。
在写的时候,也会将需要写入的脏页连同相邻其他数据页合并成一次大的写入,留存在磁盘中。
零拷贝
以上技术帮助提升 Kafka 在与操作系统磁盘交互时的数据读取与写入效率。Kafka 还注意到了消息在网络传输时,产生的额外开销,针对此,Kafka 采用了零拷贝技术来进一步提升效率。
考虑这样的场景:Consumer 订阅了 Topic,并向 Kafka 发起了消息 Pull 请求,假定 Kafka 已经在 Broker 中查找到对应的消息并返回给 Consumer,那么这个过程会有以下几个步骤:
操作系统内核读取目标数据
操作系统内核将数据跨越内核转移到应用层程序(即 Kafka)
应用层程序将收到的数据推回到网络发送的操作系统内核
操作系统内核接收到需要发送的数据,写会与 Consumer 链接的 Socket,完成网络发送。
在这个过程中,有这几个问题:
数据经过:内核->应用程序->内核,如果数据在应用程序没有额外加工,那么应用程序的这一步有点多余,最理想的情况是操作系统读取到目标数据之后直接转推到网络发送的 Socket。
数据存在多次拷贝,数据进出内核层都会需要拷贝,这会导致 CPU 资源的浪费。
Kafka 通过零拷贝技术:即要求内核直接将数据从磁盘文件拷贝到套接字,而无需通过应用程序。零拷贝不仅大大地提高了应用程序的性能,而且还减少了内核与用户模式间的上下文切换。
具体来说,Kafka 使用 Java NIO 中的库java.nio.channels.FileChannel
中的 transferTo()
方法来在 Linux 和 UNIX 系统上支持零拷贝。可以使用 transferTo() 方法直接将字节从它被调用的通道上传输到另外一个可写字节通道上,数据无需流经应用程序。
零拷贝与操作系统更详细的交互可查阅参考资料。
写在后面
深圳依然是下雨。不过雨没有那么大了。雨声小了,人间的声音大了,二者却达到了微妙的平衡。你能听到积攒了一会的水滴垂坠的落下,也能听到儿童突然的惊呼。声音的频率又间隔的拉长,是最适合睡眠的白噪声。
小猫就睡得很香,它们总是无忧无虑的。我写完了这篇,该收拾一下,悄悄的出门,给他们拉上窗帘,关掉灯,不破坏能让他们咂吧嘴的梦。也许它们的梦里有小鱼干,我的梦在出门后的路上。
参考资料
https://protobuf.dev/programming-guides/encoding/
https://www.jianshu.com/p/a52c16fca39e
版权声明: 本文为 InfoQ 作者【Codyida】的原创文章。
原文链接:【http://xie.infoq.cn/article/ba1350db6b3fba114f1076a2f】。文章转载请联系作者。
评论