写点什么

大数据 -103 Spark Streaming 消费 Kafka:Offset 获取、存储与恢复详解

作者:武子康
  • 2025-09-22
    山东
  • 本文字数:5344 字

    阅读完需:约 18 分钟

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 22 日更新到:Java-130 深入浅出 MySQL MyCat 深入解析 核心配置文件 server.xml 使用与优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成了如下的内容:


  • Spark Streaming 与 Kafka

  • 08 和 10 版本的接口对比

  • Producer、KafkaDStream 实例代码

Offset 管理

Spark Streaming 集成 Kafka,允许从 Kafka 中读取一个或者多个 Topic 的数据,一个 Kafka Topic 包含一个或者多个分区,每个分区中的消息顺序存储,并使用 offset 来标记消息位置,开发者可以在 Spark Streaming 应用中通过 offset 来控制数据的读取位置。Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将 Offset 持久化保存,该消息就会丢失,那么 Spark Streaming 就没有办法从上次停止或保存的位置继续消费 Kafka 中的消息。

获取偏移量(Obtaining Offsets)

Spark Streaming 与 Kafka 整合时,允许获取其消费的 Offset,具体方法如下:


stream.foreachRDD { rdd =>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)  println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")  }}
复制代码


注意:对 HashOffsetRanges 的类型转换只有在对 createDirectStream 调用的第一个方法中完成时才会成功,而不是在随后的方法链中。RDD 分区和 Kafka 分区之间的对应关系在 Shuffle 或重分区后会丧失,如 reduceByKey 或 window。

存储偏移量(Storing Offsets)

在 Spark Streaming 程序失败的情况下,Kafka 交付语义取决于 如何、何时存储偏移量,Spark 输出操作的语义为 at-least-once。如果要实现 EOS 语义(Exactly Once Semantics),必须在幂等的输出之后存储偏移量或者将存储偏移量与输出放在一个事务中。可以按照增加可靠性(和代码复杂度)的顺序使用以下选项来存储偏移量。

CheckPoint

CheckPoint 是 Spark Streaming 运行过程中的一个重要机制,它主要用于保存以下两类关键信息:


  1. 元数据信息:

  2. 应用程序的配置信息

  3. 未完成的批处理操作

  4. DStream 操作图

  5. 生成的 RDDs 及其依赖关系

  6. 数据状态:

  7. 每个 RDD 的数据状态

  8. Kafka 消费的 Offset 信息

  9. 窗口操作的状态数据


这些信息会被周期性地(默认间隔 10 秒)持久化到可靠的存储系统中,常见的选择包括:


  • HDFS(最常用)

  • AWS S3

  • 其他兼容 Hadoop 的文件系统


当应用程序意外终止或集群出现故障时,Spark Streaming 可以利用这些 CheckPoint 信息快速恢复到最近的一致状态,确保数据处理的精确一次(exactly-once)语义。


关于 CheckPoint 的版本兼容性问题:


当 Spark Streaming 程序代码变更后重新部署时,常见的反序列化异常(如 SerializationException)产生的原因在于:


  1. 首次运行时,CheckPoint 机制会将整个应用程序的 Jar 包序列化保存

  2. 重启时,系统会尝试使用保存的 Jar 包进行反序列化

  3. 如果新版本的代码逻辑与 CheckPoint 中保存的版本不一致(如:

  4. 类定义变更

  5. 方法签名修改

  6. 序列化 ID 改变

  7. 业务逻辑调整),就会导致反序列化失败


解决方案及注意事项:


  1. 标准解决方法:

  2. 删除 HDFS 上的 CheckPoint 目录(如:hdfs dfs -rm -r /checkpoint/path

  3. 这会同时清空保存的 Offset 信息,导致程序从 Kafka 的 auto.offset.reset 配置决定的位置重新消费

  4. 替代方案(适用于需要保留 Offset 的场景):


   // 1. 手动记录Offset到外部存储(如Zookeeper/HBase)   // 2. 使用createDirectStream时指定起始Offset   val fromOffsets = getStoredOffsets() // 从外部存储读取   val messages = KafkaUtils.createDirectStream[String, String](     ssc,     PreferConsistent,     Assign[String, String](fromOffsets.keys.toList, kafkaParams)   )
复制代码


  1. 最佳实践建议:

  2. 对于生产环境,建议实现外部 Offset 管理

  3. 在开发阶段,可以设置较短的 CheckPoint 间隔便于调试

  4. 重大逻辑变更时,应考虑创建新的 CheckPoint 路径

  5. 影响范围:

  6. 删除 CheckPoint 会导致程序失去状态信息

  7. 对于有状态转换(如 updateStateByKey 或 window 操作)会丢失历史状态

  8. 对于精确一次处理语义的应用需要特别注意

Kafka

默认情况下,消费者定期自动提交偏移量,它将偏移量存储子啊一个特殊的 Kafka 主题中(_consumer_offsets),但在某些情况下,这将导致问题,因为消息可能已经被消费者从 Kafka 拉取了,但是还没有处理。


可以将 enable.auto.commit 设置为 false,在 Spark Streaming 程序输出结果后,手动提交偏移。


stream.foreachRDD { rdd =>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  // 在输出操作完成之后,手工提交偏移量;此时将偏移量提交到 Kafka 的消息队列中  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
复制代码


与 HasOffsetRanges 一样,只有在 createDirectStream 的结果上调用时,转换到 CanCommitOffsets 才会成功,而不是在转换之后,commitAsync 调用是线程安全的,但必须在输出之后执行。

自定义存储

Offsets 可以通过多种方式来管理,但是一般来说遵循下面的步骤:


  • 在 DStream 初始化的时候,需要指定每个分区的 Offsets 用于从指定位置读取数据

  • 读取并处理消息

  • 处理完之后存储结果数据

  • 用虚线存储和提交 Offset,敲掉用户可能会执行一系列操作来满足他们更加严格的语义要求。这包括幂等操作和通过原子操作的方式存储 Offset

  • 将 Offsets 保存在外部持久化数据库,如 HBase、Kafka、HDFS、ZooKeeper、Redis、MySQL



  • 可以将 Offsets 存储到 HDFS 中,但这并不是一个好的方案,因为 HDFS 的延迟很高,此外将每批数据的 Offset 存储到 HDFS 中还会带来小文件过大的问题。

  • 可以将 Offsets 存储到 ZK 中,但是将 ZK 作为存储,也并不是一个明智的选择,同时 ZK 也不适合频繁的读写操作

Redis 管理 Offset

Spark Streaming Offset 外部存储管理实现方案

核心功能实现

1. 程序启动时加载 Offsets

  • 实现方式

  • 创建自定义的 OffsetManager 类,负责与外部存储的交互

  • 在 Spark Streaming 应用启动时调用 OffsetManager.getSavedOffsets() 方法

  • 该方法从外部存储(如 Kafka、ZooKeeper、Redis、MySQL 或 HBase)读取已保存的 Offsets

  • 示例代码


val savedOffsets = OffsetManager.getSavedOffsets(topic, partitionCount)val kafkaParams = Map[String, Object](  "bootstrap.servers" -> brokers,  "key.deserializer" -> classOf[StringDeserializer],  "value.deserializer" -> classOf[StringDeserializer],  "group.id" -> groupId,  "auto.offset.reset" -> "latest",  "enable.auto.commit" -> (false: java.lang.Boolean))
val stream = savedOffsets match { case Some(offsets) => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) ) case None => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) )}
复制代码

2. 批次处理后更新 Offsets

  • 实现方式

  • foreachRDD 转换中处理完数据后

  • 获取当前批次的 Offsets 信息

  • 调用 OffsetManager.saveOffsets() 方法将 Offsets 保存到外部存储

  • 示例代码


stream.foreachRDD { rdd =>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    // 处理RDD数据  processRDD(rdd)    // 保存Offsets到外部存储  OffsetManager.saveOffsets(offsetRanges)}
复制代码

外部存储选择与实现

1. Kafka 自身存储(最简单方案)

  • 实现方式

  • 使用 Kafka 内置的 __consumer_offsets topic

  • 通过 enable.auto.commit=true 自动提交

  • 缺点

  • 无法精确控制提交时机

  • 可能出现消费但未提交的情况

2. ZooKeeper

  • 实现方式

  • 创建 /consumers/[group_id]/offsets/[topic]/[partition] 节点

  • 使用 Curator 框架操作 ZK

3. Redis

  • 实现方式

  • 使用 Hash 结构存储:key = "offset:[topic]:[group_id]", field = "[partition]", value = offset

  • 支持原子操作和 TTL 设置

4. MySQL/HBase

  • 实现方式

  • 创建表结构:(topic, group_id, partition, offset, timestamp)

  • 使用事务保证数据一致性

注意事项

  1. 原子性保证:确保 Offset 保存和数据处理的原子性

  2. 幂等性设计:Offset 保存操作应支持重复执行

  3. 错误处理:实现重试机制应对存储系统暂时不可用

  4. 性能优化:批量写入 Offsets 减少 I/O 操作

  5. 监控报警:建立 Offset 滞后监控机制

完整实现示例

object OffsetManager {  // 从Redis获取已保存的Offsets  def getSavedOffsets(topic: String, partitionCount: Int): Option[Map[TopicPartition, Long]] = {    val jedis = RedisPool.getResource    try {      val offsets = (0 until partitionCount).map { partition =>        val offset = jedis.hget(s"offset:$topic:$groupId", partition.toString)        new TopicPartition(topic, partition) -> offset.toLong      }.toMap      Some(offsets)    } catch {      case e: Exception => None    } finally {      jedis.close()    }  }
// 保存Offsets到Redis def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = { val jedis = RedisPool.getResource try { val pipeline = jedis.pipelined() offsetRanges.foreach { offsetRange => pipeline.hset( s"offset:${offsetRange.topic}:$groupId", offsetRange.partition.toString, offsetRange.untilOffset.toString ) } pipeline.sync() } finally { jedis.close() } }}
复制代码

Redis 管理 Offsets:

  • 数据结构选择:Hash、Key、Field、Value

  • Key:kafka:topic:topicName:groupId

  • Value:offset

  • 从 Redis 中获取到保存的 Offsets

  • 消费数据后将 Offsets 保存到 Redis

自定义 Offsets:Kafka 读数据 处理完打印 Offsets

package icu.wzk
object kafkaDStream2 {
def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf() .setAppName("KafkaDStream2") .setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2)) val kafkaParams: Map[String, Object] = getKafkaConsumerParameters("wzkicu") val topics: Array[String] = Array("spark_streaming_test01")
// 从指定位置获取Kafka数据 val offsets: collection.Map[TopicPartition, Long] = Map( new TopicPartition("spark_streaming_test01",0) -> 100, // 我这里只有一个分区 你可以多创建几个 // new TopicPartition("spark_streaming_test01", 1) -> 200, // new TopicPartition("spark_streaming_test01", 2) -> 300, )
// 从Kafka中获取数据 val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) )
// DStream 输出 dstream.foreachRDD { (rdd, time) => { println(s"=========== rdd.count = ${rdd.count()}, time = $time ==============") } val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic}, ${o.partition}, ${o.fromOffset}, ${o.untilOffset}") } }
ssc.start() ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) }}
复制代码


运行结果如下图所示:



发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-103 Spark Streaming 消费 Kafka:Offset 获取、存储与恢复详解_Java_武子康_InfoQ写作社区