写点什么

大数据 -104 Spark Streaming 与 Kafka 集成:Offset 管理机制详解与最佳实践 Scala 实现

作者:武子康
  • 2025-09-23
    山东
  • 本文字数:3625 字

    阅读完需:约 12 分钟

大数据-104 Spark Streaming 与 Kafka 集成:Offset 管理机制详解与最佳实践 Scala实现

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 22 日更新到:Java-130 深入浅出 MySQL MyCat 深入解析 核心配置文件 server.xml 使用与优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成了如下的内容:


  • Spark Streaming Kafka

  • 自定义管理 Offset Scala 代码实现

Offset 管理机制详解

基本概念

Spark Streaming 与 Kafka 集成时,通过 Kafka 的 offset 机制来管理数据消费位置。每个 Kafka Topic 被划分为一个或多个分区(Partition),每个分区内的消息都是有序存储的。Offset 是一个单调递增的 64 位整数,用于标识分区中消息的精确位置。

Offset 管理的重要性

  1. 数据一致性保证:正确的 offset 管理可以确保:

  2. 每条消息都被处理且只被处理一次(Exactly-once 语义)

  3. 避免数据丢失或重复处理

  4. 故障恢复机制:当应用发生故障时,持久化的 offset 能够:

  5. 从最后成功处理的位置恢复处理

  6. 避免重新处理已处理过的数据

  7. 确保不会遗漏未处理的数据

Offset 存储方式

Spark Streaming 提供了多种 offset 存储方案:

1. 检查点机制(Checkpoint)

ssc.checkpoint("hdfs://path/to/checkpoint")
复制代码


  • 将 offset 与应用程序状态一起保存到 HDFS

  • 简单易用但不够灵活

2. 外部存储系统

// 示例:将offset保存到ZookeeperkafkaParams.put("auto.offset.reset", "smallest")kafkaParams.put("offsets.storage", "zookeeper")
复制代码


常见的外部存储选择:


  • Zookeeper

  • HBase

  • Redis

  • Kafka 本身(0.10+版本)

3. 手动管理

// 获取当前批次offsetval offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 处理完成后提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
复制代码


  • 灵活性最高

  • 需要开发者自行实现存储逻辑

最佳实践

  1. 提交时机:应在数据成功处理并持久化后提交 offset

  2. 原子性保证:offset 提交与数据处理应保持原子性

  3. 监控机制:实现 offset 滞后监控,及时发现处理瓶颈

  4. 测试方案:定期测试故障恢复流程

常见问题处理

  1. 重复消费

  2. 确保 offset 提交前数据已持久化

  3. 实现幂等处理逻辑

  4. 数据丢失

  5. 避免在处理前提交 offset

  6. 增加重试机制

  7. offset 重置

  8. 合理配置auto.offset.reset参数

  9. 首次启动时明确指定起始 offset


通过完善的 offset 管理机制,可以确保 Spark Streaming 应用具备高可靠性和数据一致性,满足关键业务场景的需求。

Spark Streaming 与 Kafka 的集成

Spark Streaming 可以通过 KafkaUtils.createDirectStream 直接与 Kafka 集成。这种方式不会依赖于 ZooKeeper,而是直接从 Kafka 分区中读取数据。在这种直接方式下,Spark Streaming 依赖 Kafka 的 API 来管理和存储消费者偏移量(Offsets),默认情况下偏移量保存在 Kafka 自身的 __consumer_offsets 主题中。

使用 Redis 管理 Offsets

Redis 作为一个高效的内存数据库,常用于存储 Spark Streaming 中的 Kafka 偏移量。通过手动管理偏移量,你可以在每批次数据处理后,将当前批次的 Kafka 偏移量存储到 Redis 中。这样,在应用程序重新启动时,可以从 Redis 中读取最后处理的偏移量,从而从正确的位置继续消费 Kafka 数据。

实现步骤

从 Redis 获取偏移量

应用启动时,从 Redis 中读取上次处理的偏移量,并从这些偏移量开始消费 Kafka 数据。

处理数据

通过 Spark Streaming 处理从 Kafka 消费到的数据。

保存偏移量到 Redis

每处理完一批数据后,将最新的偏移量存储到 Redis 中。这样,如果应用程序崩溃或重启,可以从这个位置继续消费。

自定义 Offsets:根据 Key 从 Redis 获取 Offsets 处理完更新 Redis

添加依赖

<!-- jedis --><dependency>  <groupId>redis.clients</groupId>  <artifactId>jedis</artifactId>  <version>2.9.0</version></dependency>
复制代码


服务器上我们需要有:Redis 服务启动



Kafka 服务启动



编写代码,实现的主要逻辑如下所示:


package icu.wzkobject KafkaDStream3 {  def main(args: Array[String]): Unit = {    Logger.getLogger("args").setLevel(Level.ERROR)    val conf = new SparkConf()      .setAppName("KafkaDStream3")      .setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(5))    val groupId: String = "wzkicu"    val topics: Array[String] = Array("spark_streaming_test01")    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
// 从 Kafka 获取 Offsets val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)
// 创建 DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) )
// DStream 转换&输出 dstream.foreachRDD { (rdd, time) => if (!rdd.isEmpty()) { // 处理消息 println(s"====== rdd.count = ${rdd.count()}, time = $time =======") // 将 Offsets 保存到 Redis val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId) } }
ssc.start() ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) }}
复制代码


代码中我们封装了一个工具类:


package icu.wzk
object OffsetsRedisUtils {
private val config = new JedisPoolConfig private val redisHost = "h121.wzk.icu" private val redisPort = 6379
config.setMaxTotal(30) config.setMaxIdle(10)
private val pool= new JedisPool(config, redisHost, redisPort, 10000) private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId" private def getRedisConnection: Jedis = pool.getResource
// 从Redis中获取Offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => import scala.collection.JavaConverters._ jedis.hgetAll(getKey(topic, groupId)) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } jedis.close() offsets.flatten.toMap }
// 将 Offsets 保存到 Redis def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = { val jedis: Jedis = getRedisConnection ranges .map(range => (range.topic, range.partition -> range.untilOffset)) .groupBy(_._1) .map { case (topic, buffer) => (topic, buffer.map(_._2)) } .foreach { case (topic, partitionAndOffset) => val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString)) import scala.collection.JavaConverters._ jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava) }
jedis.close() }
}
复制代码


我们启动后,如图所示:



这里我使用 Redis 查看当前的存储情况:



可以看到当前已经写入了,我们继续启动 KafkaProducer 工具,继续写入数据。可以看到,已经统计到数据了。



我们继续查看当前的 Redis 中的数据:



发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-104 Spark Streaming 与 Kafka 集成:Offset 管理机制详解与最佳实践 Scala实现_Java_武子康_InfoQ写作社区