本文作者为中国移动云能力中心大数据团队软件开发工程师冯永设,文章主要介绍了 Kafka 的分层存储架构,数据持久化存储的实现步骤以及底层实现原理。
前言
消息中间件可以用来实现不同平台之间的消息传递,客户端通常基于订阅机制获取最新数据,历史数据的价值较低,因此消息中间件中通过设置消息的留存时间进行数据的清理,同时为了提高消息的吞吐量,使用内存或者 SSD 来存储刚写入的热数据。
但是目前几乎所有的消息中间件都以打造分布式消息流平台为目标,其中一个关键能力是持久化存储消息流,这就需要将历史数据持久化存储而不是设置留存周期进行自动删除,综上分析,分层存储的能力对于消息中间件的架构演进至关重要。
分层存储是一种数据存储方法或系统,由两种或更多存储介质类型组成,如 Tape、硬盘、SSD(固态硬盘)、内存等,存储数据的介质类型的选用由成本、数据可用性及数据可恢复性等要求来决定。
使用分层存储是为了实现成本管理和存储空间效率,主要思想是让处理器需要的数据存储在更快的存储层中,而将不活跃的数据文件存放到速度慢、廉价且容量大的存储设备中。
Apache Kafka 开源的版本中分层存储架构如下图所示:
Page Cache 是操作系统实现的一种磁盘缓存,将磁盘中的数据缓存到内存中,当进程读取磁盘文件内容时会先读取 Page Cache,如果存在则直接返回数据,没有命中则操作系统向磁盘发起读取请示并将读取的数据页写入到缓存中,之后再将数据返回到进程。
SSD 相较于 HDD 在带宽上有数量级别的提升,其使用可以带来性能上的提升,有两种方式:使用 SSD 替代 HDD;将其作为 PageCache 与 HDD 直接的缓存,解决 PageCache 出现竞争后承接部分流量。以上存储介质上的数据到了留存时间后会自动清除,如果需要持久化存储可以 Offload 到第三方存储系统中,例如 AWS S3、Hadoop 等。
但是目前社区代码中还不支持第三方存储。当前业界比较常用的消息中间件,Pulsar 和 Pravega 底层使用 Bookkeeper,原生支持将数据 OffLoad 到云数据库、HDFS 等,RocketMQ 和 Kafka 数据的分层存储均在开发中,估计在今年都会发布开源实现。
快速入门
Kafka 的分层存储在 Confluent 平台中已经支持但是未开源,社区在持续开发中,功能进展情况可以参考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405
Kafka 已经提供了 Remote Storage 的核心代码,用户可以扩展相应的接口来将数据移动到第三方存储系统中。下面介绍 github 上实现的将数据 offload 的 hdfs 的持久化存储的开源项目。
1、编译安装
$ git clone -b 2.8.x-tiered-storage https://github.com/satishd/kafka.git
$ cd kafka
$ ./gradlew releaseTarGz
复制代码
编译成功后可以在 core/build/distributions 目录下找到安装包,将安装包复制到安装主机中进行解压。
2、完成配置
分层存储依赖 HDFS,其安装过程在这里不再介绍。在 server.properties 中增加分层存储配置,配置示例如下所示:
remote.log.storage.system.enable=true
remote.log.storage.manager.class.name=org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManager
remote.log.storage.manager.class.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/external/hdfs/libs/*:/opt/hadoop-3.3.1/etc/hadoop/
remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
remote.log.metadata.manager.class.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/libs/kafka-storage-2.8.1-SNAPSHOT.jar
remote.log.storage.hdfs.keytab.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/config/hdfs.headless.keytab
remote.log.metadata.topic.num.partitions=1
rremote.log.metadata.topic.replication.factor=1
retention.ms=259200000
remote.log.storage.manager.impl.prefix=remote.log.storage.
remote.log.storage.hdfs.base.dir=kafka-remote-storage
remote.log.storage.hdfs.user=hdfs-hctest@BCHKDC
复制代码
3、启动命令
$cd /opt/satish/kafka_2.13-2.8.1-SNAPSHOT
$bin/kafka-server-start -daemon config/server.properties
复制代码
4、执行测试
•创建 topic
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic tieriedstorage --replication-factor 1
--partitions 5 --create --config local.retention.ms=60000
--config segment.bytes=1048576 --config remote.storage.enable=true
复制代码
核心参数为 remote.storage.enable 及 retention.ms,当日志文件超过保留时间会自动将数据复制到 HDFS 中。
•写入数据
sh kafka-producer-perf-test.sh --topic tieriedstorage --num-records 100000 --throughput 100
--record-size 10240 --producer.config test-producer.properties
复制代码
•查看 server 日志,可以看到数据移动过程,日志信息如下:
Created a new task: class kafka.log.remote.RemoteLogManager$RLMTask[TopicIdPartition{topicId=eRjo9HPJS92E7_RC5D5G0A, topicPartition=tieredstorage-0}] and getting scheduled (kafka.log.remote.RemoteLogManager)
.....
Copying /tmp/satish/kafka-logs/tieriedstorage/00000000000000022321.log.toPath to remote storage. (kafka.log.remote.RemoteLogManager$RLMTask)
Copying timeIndex: /tmp/satish/kafka-logs/tieriedstorage/00000000000000022321.timeindex to remote storage. (kafka.log.remote.RemoteLogManager$RLMTask)
.....
复制代码
这时可以查看到 HDFS 中的日志数据,其目录在
remote.log.storage.hdfs.base.dir=kafka-remote-storage
复制代码
如下图所示:
这里每一个文件都是 LogSegment,具体细节在下面章节进行介绍。
核心流程分析
持久化存储的设计核心逻辑是将冷热数据自动迁移到对应的存储设备上并提供透明的访问方式,流程的核心如下:
•元数据及数据的持久化存储•远程存储数据与原生存储之间的映射关系•提供统一的对外访问入口,对客户端透明•分层存储数据流自动迁移
Kafka 的分层架构设计,可见 KIP-405,系统架构图下所示:
在示例中的实现是在 ReplicaManager 中增加 RemoteLogManager(RLM)模块进行远端存储的管理,底层实现细节如下图所示:
1、核心组件,RLM 运行后根据分区的 Leader 变化、保留周期等要素触发分区数据的管理,RLM 的实现分为两部分:
•RemoteStorageManager(RSM),远程 LogSegment 和索引文件的周期管理•RemoteLogMetadataManager(RLMM),提供 LogSegment 的元数据的管理,用于支持数据的强一致性
2、任务机制,RLM 在启动后会启动以下任务:
•RLMTask,分区的 Leader 副本所在的节点会确定 RLMTask 进行日志的管理,当遇到阈值条件是将 LogSegment 移动到远端存储•AsyncReadTask,客户端发起读请求,基于 Offset 在本地 LogSegment 找不到数据时(OffsetOutOfRangeException),将向远程存储发起读请求,具体通过该 Task 来具体获取,核心逻辑是创建 CacheInputStream,读取后将封装成 MemoryRecords 后发送给客户端
3、底层文件的封装,在 HDFS 上 LogSegment 的所有文件,包括 log、index、leaderEpoch 等均存放在同一个文件中,结构如下:
在文件头(25 字节)中保存了各个文件在文件中的位置,具体长度根据下一个文件的起始位置进行计算,文件类型包括:
public enum FileType {
OFFSET_INDEX((byte) 0),
TIMESTAMP_INDEX((byte) 1),
LEADER_EPOCH_CHECKPOINT((byte) 2),
PRODUCER_SNAPSHOT((byte) 3),
TRANSACTION_INDEX((byte) 4),
SEGMENT((byte) 5);
}
复制代码
客户端在读取时首先可根据 RemoteLogSegment 的文件头获取 Index 文件的位置,然后获取 Offset 对应 Record 在文件中位置,最后再获取数据。
实现细节可见 HDFSRemoteStorageManager,其中获取数据的核心代码如下:
private InputStream fetchData(RemoteLogSegmentMetadata metadata,
LogSegmentDataHeader.FileType fileType,
int startPosition,
int endPosition) throws RemoteStorageException {
try {
Path dataFilePath =
new Path(getSegmentRemoteDir(metadata.remoteLogSegmentId()));
return new CachedInputStream(dataFilePath, fileType, startPosition,endPosition);
} .....
}
复制代码
4、消息读取流程,引入远程存储后读取流程基本上相同,当出现 OffsetOutOfRange 异常后再从远程读取,即创建 AsyncReadTask,核心代码如下:
case e: OffsetOutOfRangeException =>
if (remoteLogManager.isDefined && log != null && !log.config.compact &&
log.rlmEnabled && log.config.remoteStorageEnable) {
val leaderLogStartOffset = log.logStartOffset
val leaderLogEndOffset = log.logEndOffset
......
val lastStableOffset = Some(log.lastStableOffset)
val fetchDataInfo = {
FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset), MemoryRecords.EMPTY,
delayedRemoteStorageFetch = Some(
RemoteStorageFetchInfo(
adjustedMaxBytes,
minOneMessage,
tp,
fetchInfo,
fetchIsolation)))
} .......
复制代码
启动 AsyncReadTask 的逻辑,获取数据并调用 responseCallback,将其发送给客户端,调用逻辑在 ReplicaManager#fetchMessage 中实现。
logReadResults.foreach { case (topicPartition, logReadResult) =>
if (remoteFetchInfo.isEmpty &&
logReadResult.info.delayedRemoteStorageFetch.isDefined)
remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
......
}
val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
if (remoteFetchInfo.isDefined) {
val key = new TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), remoteFetchInfo.get.topicPartition.partition())
val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
var remoteFetchTask: RemoteLogManager#AsyncReadTask = null
try {
remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: RemoteLogReadResult) => {
remoteFetchResult.complete(result)
delayedRemoteFetchPurgatory.checkAndComplete(key)
})
}
复制代码
总结
持久化存储能力对于分布式流平台来说是不可缺少的一环,也许是由于 Confluent 公司在商业方面的考虑,目前社区对于这块功能的开发进度还是比较慢的。但对于该功能的发布还是值得期待,当具备持久化存储能力后,可以省去很多数据流转的工作,一些相对简单的实时处理能力也可以通过 Kafka Stream 来实现,简化大数据平台的总体框架。
评论