点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 22 日更新到:Java-130 深入浅出 MySQL MyCat 深入解析 核心配置文件 server.xml 使用与优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解
章节内容
上节完成了如下的内容:
Spark Streaming Kafka
自定义管理 Offset Scala 代码实现
Offset 管理机制详解
基本概念
Spark Streaming 与 Kafka 集成时,通过 Kafka 的 offset 机制来管理数据消费位置。每个 Kafka Topic 被划分为一个或多个分区(Partition),每个分区内的消息都是有序存储的。Offset 是一个单调递增的 64 位整数,用于标识分区中消息的精确位置。
Offset 管理的重要性
数据一致性保证:正确的 offset 管理可以确保:
每条消息都被处理且只被处理一次(Exactly-once 语义)
避免数据丢失或重复处理
故障恢复机制:当应用发生故障时,持久化的 offset 能够:
从最后成功处理的位置恢复处理
避免重新处理已处理过的数据
确保不会遗漏未处理的数据
Offset 存储方式
Spark Streaming 提供了多种 offset 存储方案:
1. 检查点机制(Checkpoint)
ssc.checkpoint("hdfs://path/to/checkpoint")
复制代码
2. 外部存储系统
// 示例:将offset保存到Zookeeper
kafkaParams.put("auto.offset.reset", "smallest")
kafkaParams.put("offsets.storage", "zookeeper")
复制代码
常见的外部存储选择:
Zookeeper
HBase
Redis
Kafka 本身(0.10+版本)
3. 手动管理
// 获取当前批次offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 处理完成后提交offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
复制代码
最佳实践
提交时机:应在数据成功处理并持久化后提交 offset
原子性保证:offset 提交与数据处理应保持原子性
监控机制:实现 offset 滞后监控,及时发现处理瓶颈
测试方案:定期测试故障恢复流程
常见问题处理
重复消费:
确保 offset 提交前数据已持久化
实现幂等处理逻辑
数据丢失:
避免在处理前提交 offset
增加重试机制
offset 重置:
合理配置auto.offset.reset
参数
首次启动时明确指定起始 offset
通过完善的 offset 管理机制,可以确保 Spark Streaming 应用具备高可靠性和数据一致性,满足关键业务场景的需求。
Spark Streaming 与 Kafka 的集成
Spark Streaming 可以通过 KafkaUtils.createDirectStream 直接与 Kafka 集成。这种方式不会依赖于 ZooKeeper,而是直接从 Kafka 分区中读取数据。在这种直接方式下,Spark Streaming 依赖 Kafka 的 API 来管理和存储消费者偏移量(Offsets),默认情况下偏移量保存在 Kafka 自身的 __consumer_offsets 主题中。
使用 Redis 管理 Offsets
Redis 作为一个高效的内存数据库,常用于存储 Spark Streaming 中的 Kafka 偏移量。通过手动管理偏移量,你可以在每批次数据处理后,将当前批次的 Kafka 偏移量存储到 Redis 中。这样,在应用程序重新启动时,可以从 Redis 中读取最后处理的偏移量,从而从正确的位置继续消费 Kafka 数据。
实现步骤
从 Redis 获取偏移量
应用启动时,从 Redis 中读取上次处理的偏移量,并从这些偏移量开始消费 Kafka 数据。
处理数据
通过 Spark Streaming 处理从 Kafka 消费到的数据。
保存偏移量到 Redis
每处理完一批数据后,将最新的偏移量存储到 Redis 中。这样,如果应用程序崩溃或重启,可以从这个位置继续消费。
自定义 Offsets:根据 Key 从 Redis 获取 Offsets 处理完更新 Redis
添加依赖
<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
复制代码
服务器上我们需要有:Redis 服务启动
Kafka 服务启动
编写代码,实现的主要逻辑如下所示:
package icu.wzk
object KafkaDStream3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("args").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream3")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val groupId: String = "wzkicu"
val topics: Array[String] = Array("spark_streaming_test01")
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
// 从 Kafka 获取 Offsets
val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)
// 创建 DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
)
// DStream 转换&输出
dstream.foreachRDD {
(rdd, time) =>
if (!rdd.isEmpty()) {
// 处理消息
println(s"====== rdd.count = ${rdd.count()}, time = $time =======")
// 将 Offsets 保存到 Redis
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
复制代码
代码中我们封装了一个工具类:
package icu.wzk
object OffsetsRedisUtils {
private val config = new JedisPoolConfig
private val redisHost = "h121.wzk.icu"
private val redisPort = 6379
config.setMaxTotal(30)
config.setMaxIdle(10)
private val pool= new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
// 从Redis中获取Offsets
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map {
topic =>
import scala.collection.JavaConverters._
jedis.hgetAll(getKey(topic, groupId))
.asScala
.map {
case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong
}
}
jedis.close()
offsets.flatten.toMap
}
// 将 Offsets 保存到 Redis
def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
val jedis: Jedis = getRedisConnection
ranges
.map(range => (range.topic, range.partition -> range.untilOffset))
.groupBy(_._1)
.map {
case (topic, buffer) => (topic, buffer.map(_._2))
}
.foreach {
case (topic, partitionAndOffset) =>
val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConverters._
jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
}
jedis.close()
}
}
复制代码
我们启动后,如图所示:
这里我使用 Redis 查看当前的存储情况:
可以看到当前已经写入了,我们继续启动 KafkaProducer 工具,继续写入数据。可以看到,已经统计到数据了。
我们继续查看当前的 Redis 中的数据:
评论