写点什么

大数据 -70 Kafka 日志清理:删除、压缩及混合模式最佳实践

作者:武子康
  • 2025-08-17
    山东
  • 本文字数:4302 字

    阅读完需:约 14 分钟

大数据-70 Kafka 日志清理:删除、压缩及混合模式最佳实践

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 11 日更新到:Java-94 深入浅出 MySQL EXPLAIN 详解:索引分析与查询优化详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下内容:


  • 日志索引文件

  • 查看物理存储、查看详细的索引文件

  • 消息偏移

  • 偏移量的存储


日志清理

Kafka 提供两种主要的日志清理策略,用于管理磁盘空间和提高存储效率:

1. 日志删除策略 (Delete)

日志删除策略会按照预定的删除规则,移除不再需要保留的数据。主要有以下几种删除方式:


  • 基于时间:删除早于指定时间阈值的消息(默认 168 小时/7 天)示例配置:log.retention.hours=168

  • 基于大小:当分区日志文件超过指定大小时触发删除示例配置:log.retention.bytes=1073741824 (1GB)

  • 基于起始偏移量:保留最新的 N 条消息


应用场景:适用于临时数据存储,如事件流、日志收集等不需要长期保存的数据。

2. 日志压缩策略 (Compact)

日志压缩策略针对消息的 Key 进行优化存储:


  • 只保留每个 Key 的最新 Value

  • 删除旧版本的同 Key 消息

  • 保留所有 Key 的最新快照


应用场景:适用于需要维护最新状态的数据,如:


  • 数据库变更捕获

  • 配置更新

  • 用户会话信息

配置参数

全局配置参数:


log.cleanup.policy=delete  # 默认值,可选delete/compact/both
复制代码


主题级别配置(优先级高于全局配置):


cleanup.policy=compact
复制代码


混合模式配置(同时使用两种策略):


cleanup.policy=delete,compact
复制代码

注意事项

  1. 压缩策略需要消息有明确的 Key

  2. 删除策略更节省 CPU 资源

  3. 压缩策略会保留墓碑消息(key=null)一段时间

  4. 两种策略可以同时使用,但需要谨慎配置


实际应用中,应根据业务需求和数据特性选择合适的清理策略。

日志删除

基于时间

日志删除任务会根据 log.retention.hours / log.retention.minutes / log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。


Kafka 依据日志分段中最大的时间戳进行定位。首先要查询日志分段所对应的时间戳文件,查找时间索引文件中最后一个索引项,若最后一条索引项的时间戳字段大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间?

因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间消息。

删除过程

  • 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有现成对这些日志分段进行读取操作。

  • 这些日志分段上所有文件添加上 .delete 后缀。

  • 交由一个 delete-file 命名的延迟任务来删除这些 .delete 为后缀的文件,延迟执行时间可以通过 file.delete.delay.ms 进行设置。

如果活跃日志分段中存在需要删除的数据?

  • Kafka 会切分出一个新的日志分段作为活跃的日志分段,该日志分段不删除,删除原来的日志分段。

  • 先腾出地方,再删除。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值,设定项为:log.retention.bytes。单个日志分段的大小由 log.segement.bytes 进行设定。

删除过程

  • 计算需要被删除的日志总大小(当前日志大小(所有分段)减去 retention 值)

  • 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合

  • 执行删除

基于偏移量

根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除日志分段。


删除过程

  • 从头开始遍历每个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 21,小于 LogStartOffset,将日志分段 1 加入到删除队列中

  • 日志分段 2 的下一个日志分段的起始偏移量 35,小于 LogStartOffset,将日志分段 2 加入到删除队列中

  • 日志分段 3 的下一个日志分段的起始偏移量 57,小于 LogStartOffset,将日志分段 3 加入到删除队列中

  • 日志分段 4 的下一个日志分段的起始偏移量 71,大于 LogStartOffset,则不进行删除。

日志压缩

基础概念

日志压缩(Log Compaction)是 Apache Kafka 提供的一种核心数据保留机制,它通过对消息日志进行智能压缩,提供了比传统时间保留策略更细粒度的数据管理方式。

工作机制

  1. Key-Value 存储模型

  2. Kafka 将每条消息视为一个键值对(Key-Value Pair)

  3. 键(Key)用于标识消息的唯一性

  4. 值(Value)包含实际的消息内容

  5. 压缩过程

  6. Kafka 会定期扫描主题分区中的消息

  7. 对于具有相同 Key 的消息,只保留最新版本(offset 最大的那条)

  8. 旧版本的消息会被标记为可删除

  9. 在适当的时机(如后台压缩线程运行时),这些旧消息会被物理删除

  10. 压缩触发条件

  11. 当分区大小达到配置的阈值时

  12. 按照配置的时间间隔定期执行

  13. 通过管理 API 手动触发

与时间保留策略的区别

典型应用场景

  1. 变更数据捕获(CDC)

  2. 数据库表变更跟踪

  3. 每个表行的主键作为 Kafka 消息 Key

  4. 只保留每行的最新状态

  5. 配置管理

  6. 系统配置项的更新

  7. 配置 ID 作为 Key

  8. 确保始终获取最新配置

  9. 状态存储

  10. 用户会话状态维护

  11. 用户 ID 作为 Key

  12. 高效存储最新用户状态

配置示例

# 启用日志压缩log.cleanup.policy=compact
# 触发压缩的阈值(消息数量)log.cleaner.min.compaction.lag.ms=0
# 压缩检查频率log.cleaner.backoff.ms=15000
复制代码

注意事项

  1. 压缩后的日志仍然保持有序性

  2. 删除的消息(null 值)会被特殊处理

  3. 活跃段(active segment)不会被压缩

  4. 需要考虑压缩过程对系统性能的影响

应用场景

日志压缩特性在实时计算系统中,特别是在异常容灾方面具有重要的应用价值。以下是几个典型的使用场景:


  1. Spark/Flink 实时计算场景

  2. 在流处理作业中,系统通常需要在内存中维护状态数据,例如:

  3. 滑动窗口聚合结果(如过去 24 小时的交易总额)

  4. 用户会话状态(如在线用户的最近操作记录)

  5. 机器学习模型的中间参数

  6. 这些状态数据可能是通过持续处理数小时甚至数天的日志流计算得出的

  7. 容灾恢复机制

  8. 当系统遇到以下故障时:

  9. 节点内存溢出(OOM)

  10. 网络分区导致状态丢失

  11. 磁盘故障造成检查点损坏

  12. 传统恢复方式需要:

  13. 从源头重新消费所有数据

  14. 重新执行所有计算逻辑

  15. 重建完整状态

  16. 对于处理 TB 级数据的作业,这个过程可能需要数小时

  17. 日志压缩解决方案

  18. 实施步骤:

  19. 配置定期快照策略(如每 5 分钟)

  20. 将内存状态序列化后写入:

  21. 分布式文件系统(HDFS/S3)

  22. 高性能 KV 存储(RocksDB)

  23. 消息系统(Kafka 压缩日志)

  24. 故障恢复时:

  25. 加载最近的有效快照

  26. 只需处理快照时间点之后的增量数据

  27. 优势:

  28. 恢复时间从小时级降至分钟级

  29. 显著降低重复计算的开销

  30. 保证处理结果的精确一次性(exactly-once)

  31. 实际应用案例

  32. 电商实时大屏:

  33. 崩溃后 10 分钟内恢复实时 GMV 统计

  34. 金融风控系统:

  35. 确保可疑交易监控不出现漏判

  36. IoT 设备监控:

  37. 维持设备状态连续性


这种机制特别适合对计算连续性要求高、数据吞吐量大的实时处理场景,能有效平衡性能与可靠性。

使用日志压缩来替代这些外部存储有哪些优势和好处?

  • Kafka 即是数据源又是存储工具,可以简化技术栈,降低维护成本

  • 使用外部存储介质的话,需要将存储的 Key 记录下来,恢复的时候再使用这些 Key 将数据取回,实现起来有一定的工程难度和复杂度。使用 Kafka 的日志压缩特性,只需要把数据写入 Kafka,等异常出现恢复任务再读回内存就可以了

  • Kafka 对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索引查询等工作量负担,可以实现高性能。同时,Kafka 的日志压缩机制可以充分利用廉价的磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(仅针对异常处理和容灾的场景)。

日志压缩实现细节

主题的 cleanup.policy 需要设置为:compactKafka 后台线程会定时将 Topic 遍历两次:


  • 记录每个 Key 的 Hash 值最后一次出现的偏移量

  • 第二次检查每个 Offset 对应的 Key 是否在后面的日志中出现过,如果出现了就删除对应的日志。


日志压缩允许删除,除最后一个 key 外,删除先前出现的所有该 Key 对应的记录,在一段时间后从日志中清理以释放空间。注意:日志压缩与 Key 有关,确保每个消息的 Key 不为 Null。


压缩是在 Kafka 后台通过定时重新打开 Segment 来完成的,Segment 压缩细节如下图所示:



日志压缩可以确保:


  • 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。

  • 可以使用 Topic 的 min.compation.lag.ms 属性来保证消息在被压缩之前必须经过的最短时间,也就是说,它为每个消息(未压缩)头部停留的时间提供下一个下限。可以使用 Topic 的 max.compactiton.lag.ms 属性来保证从收到消息符合压缩条件之间的最大延时

  • 消息始终保证顺序,压缩永远不会重新排序消息,只是删除一些而已

  • 消息的偏移量永远不会改变,它是日志中位置的永久标识

  • 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。

  • 另外,如果使用者在比 Topic 的 log.cleaner.delete.retention.ms 短的时间内到达日志的头部,则会看到已删除的所有的 delete 标记, 保留时间默认是 24 小时。


默认情况下,启动日志清理器,若需要启动特定 Topic 的日志清理,请添加特定的属性。

日志清理器配置详解

日志清理是 Kafka 维护数据存储效率的重要功能,以下是详细的配置指南:

核心配置参数

  1. log.cleanup.policy

  2. 建议设置为compact(压缩策略),这是 Broker 级别的配置,会影响集群中所有 Topic 的行为。

  3. 应用场景:适用于需要保留每个 Key 最新值的场景,如配置信息、状态更新等

  4. 替代选项:delete(删除策略,基于时间/大小)

  5. log.cleaner.min.compaction.lag.ms

  6. 该参数用于防止对最近更新的消息进行压缩,确保消息在指定时间内不会被清理。

  7. 典型值:建议设置为消息处理所需的最长时间(如 1 小时)

  8. 默认行为:如果未设置,除最后一个 Segment 外的所有 Segment 都可被压缩

日志压缩工作原理

Kafka 的日志压缩采用"读取-处理-写入"机制:


  1. 读取阶段:扫描日志两遍

  2. 第一遍:建立 Key-Offset 映射表

  3. 第二遍:验证数据有效性

  4. 写入阶段:将有效数据写入新 Segment


性能考虑因素:


  • 现代 CPU 处理速度通常远高于磁盘 I/O

  • 压缩效率取决于:

  • 日志总量(建议控制在 TB 级别以下)

  • 磁盘类型(SSD 性能优于 HDD)

  • 消息平均大小(建议优化在 1-10KB)

最佳实践建议

  1. 监控指标:重点关注cleaner-buffer-utilizationcleaner-reaper-percent指标

  2. 资源分配:为清理线程分配足够内存(通过log.cleaner.dedupe.buffer.size配置)

  3. 时间窗口:设置合理的log.cleaner.backoff.ms(默认 15 秒)控制压缩频率


示例配置:


log.cleanup.policy=compactlog.cleaner.min.compaction.lag.ms=3600000  # 1小时log.cleaner.threads=4  # 根据CPU核心数调整log.cleaner.dedupe.buffer.size=134217728  # 128MB
复制代码


发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-70 Kafka 日志清理:删除、压缩及混合模式最佳实践_Java_武子康_InfoQ写作社区