写点什么

大数据 -102 Spark Streaming 与 Kafka 集成全解析:Receiver 与 Direct 两种方式详解 附代码案例

作者:武子康
  • 2025-09-20
    山东
  • 本文字数:4029 字

    阅读完需:约 13 分钟

大数据-102 Spark Streaming 与 Kafka 集成全解析:Receiver 与 Direct 两种方式详解 附代码案例

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 15 日更新到:Java-124 深入浅出 MySQL Seata 框架详解:分布式事务的四种模式与核心架构 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Spark Streaming DStream 有状态转换

  • DStream 有状态转换 案例


基础介绍

针对不同的 Spark、Kafka 版本,集成处理数据的方式有两种:


  • Receiver Approach

  • Direct Approach


对应的版本:



版本的发展:


Kafka-08 接口

Receiver based Approach

基于 Receiver 的 Kafka 消费方式采用 Kafka 旧版本(0.8.2.1 及之前版本)的高阶消费者 API 实现。这种方式的整体工作流程如下:


  1. 数据接收与存储

  2. 每个 Receiver 作为一个长期运行的 Task 被调度到 Executor 上执行

  3. 接收到的 Kafka 数据首先存储在 Spark Executor 的内存中,底层通过 BlockManager 进行管理

  4. 默认情况下,每 200ms(由 spark.streaming.blockInterval 参数控制)会将累积的数据生成一个 Block

  5. 这些 Block 会被复制到其他 Executor 以实现容错(默认复制因子为 2)

  6. 数据处理流程

  7. Spark Streaming 定期生成 Job 时,会根据这些 Block 构建 BlockRDD

  8. 最终这些 RDD 会被作为普通的 Spark 任务执行

  9. 每个 Block 对应 RDD 的一个分区,因此可以通过调整 blockInterval 来控制 RDD 的分区数量

  10. 关键特性与注意事项

  11. Receiver 部署特性

  12. 每个 Receiver 作为一个常驻线程运行在 Executor 上,会持续占用一个 CPU 核心

  13. Receiver 数量由调用 KafkaUtils.createStream() 的次数决定,每次调用创建一个 Receiver

  14. 可通过多个 createStream() 调用来实现多个 Receiver 并行消费

  15. 并行度限制

  16. Kafka Topic 的分区数与 Spark RDD 分区数没有直接关联

  17. 增加 Kafka Topic 的分区数只会增加单个 Receiver 内部的消费线程数

  18. 实际的 Spark 处理并行度仍由 Block 数量决定

  19. 示例:一个 Receiver 消费 4 分区 Topic,但仍只生成单个 RDD 分区

  20. 性能考量

  21. 数据本地性问题:包含 Receiver 的 Executor 会优先被调度执行 Task,可能导致集群负载不均衡

  22. 默认 blockInterval 为 200ms,可根据数据量调整:

  23. 小数据量:可适当增大间隔减少开销

  24. 大数据量:可减小间隔提高并行度

  25. 可靠性保障

  26. 默认配置下,Receiver 方式可能在故障时丢失已接收但未处理的数据

  27. 可通过启用预写日志(WAL)提高可靠性:

  28. 设置 spark.streaming.receiver.writeAheadLog.enable=true

  29. 数据会先写入 HDFS 等可靠存储

  30. 但会带来额外的磁盘 IO 开销,降低吞吐量约 10-20%

  31. 典型应用场景

  32. 适合对延迟不敏感、吞吐量适中的场景

  33. 当需要与 Kafka 0.8.x 旧版本兼容时

  34. 需要简单实现 Exactly-once 语义时可结合 WAL 使用

  35. 不适合需要高吞吐、低延迟或严格资源隔离的场景

Kafka-08 接口(Receiver 方式)


  • Offset 保存在 ZK 中,系统管理

  • 对应 Kafka 版本 0.8.2.1 +

  • 接口底层实现使用 Kafka 旧版消费者 高阶 API

  • DStream 底层实现为 BlockRDD

Kafka-08 接口(Receiver with WAL)


  • 增强了故障恢复的能力

  • 接收的数据与 Driver 的元数据保存到 HDFS

  • 增加了流式应用处理的延迟

Direct Approach


Direct Approach 是 Spark Streaming 不使用 Receiver 集成 Kafka 的方式,在企业生产环境中使用较多,相较于 Receiver,有以下特点:


  • 不使用 Receiver,减少不必要的 CPU 占用,减少了 Receiver 接收数据写入 BlockManager,然后运行时再通过 BlockId、网络传输、磁盘读取等来获取数据的整个过程,提升了效率,无需 WAL,进一步减少磁盘 IO

  • Direct 方式生的 RDD 是 KafkaRDD,它的分区数与 Kafka 分区数保持一致,便于把控并行度。注意:在 Shuffle 或 Repartition 操作后生成的 RDD,这种对应关系会失效

  • 可以手动维护 Offset,实现 Exactly Once 语义

Kafka-10 接口

Spark Streaming 与 Kafka 0.10 整合,和 0.8 版本的 Direct 方式很像,Kafka 的分区和 Spark 的 RDD 分区是一一对应的,可以获取 Offsets 和 元数据,API 使用起来没有显著的区别。

添加依赖

<dependency>  <groupId>org.apache.spark</groupId>  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>  <version>${spark.version}</version></dependency>
复制代码


不要手动添加 org.apache.kafka 相关的依赖,如 kafka-clients,spark-streaming-kafka-0-10 已经包含相关的依赖了,不同的版本会有不同程度的不兼容。


使用 kafka010 接口从 Kafka 中获取数据:


  • Kafka 集群

  • Kafka 生产者发送数据

  • Spark Streaming 程序接收数

KafkaProducer

编写代码

package icu.wzk
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.codehaus.jackson.map.ser.std.StringSerializer
import java.util.Properties
object KafkaProducerTest {
def main(args: Array[String]): Unit = { // 定义 Kafka 参数 val brokers = "h121.wzk.icu:9092" val topic = "topic_test" val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// KafkaProducer val producer = new KafkaProducer[String, String](prop) for (i <- 1 to 1000) { val msg = new ProducerRecord[String, String](topic, i.toString, i.toString) // 发送消息 producer.send(msg) println(s"i = $i") Thread.sleep(100) } producer.close() }}
复制代码

运行测试

i = 493i = 494i = 495i = 496i = 497i = 498i = 499i = 500i = 501i = 502i = 503i = 504
复制代码


运行过程截图为:


查看 Kafka

我们在服务器上查看当前 Kafka 中的队列信息:


kafka-topics.sh --list --zookeeper h121.wzk.icu:2181
复制代码


可以看到队列已经加入了,spark_streaming_test01:


KafkaDStream

编写代码

package icu.wzk
object KafkaDStream1 {
def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf() .setAppName("KafkaDStream1") .setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2)) val kafkaParams: Map[String, Object] = getKafkaConsumerParameters("wzkicu") val topics: Array[String] = Array("spark_streaming_test01")
// 从 Kafka 中获取数据 val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils .createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) // dstream输出 dstream.foreachRDD { (rdd, time) => if (!rdd.isEmpty()) { println(s"========== rdd.count = ${rdd.count()}, time = $time ============") } }
ssc.start() ssc.awaitTermination() }
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) }}
复制代码

运行结果

WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/wuzikang/.m2/repository/org/apache/spark/spark-unsafe_2.12/2.4.5/spark-unsafe_2.12-2.4.5.jar) to method java.nio.Bits.unaligned()WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future releaseUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties========== rdd.count = 1000, time = 1721721502000 ms ============
复制代码


运行截图如下:


生成数据

继续启动 KafkaProducer 的程序,让数据不断地写入我们会看到控制台输出内容如下:


========== rdd.count = 1000, time = 1721721502000 ms ====================== rdd.count = 9, time = 1721721710000 ms ====================== rdd.count = 19, time = 1721721712000 ms ====================== rdd.count = 19, time = 1721721714000 ms ====================== rdd.count = 19, time = 1721721716000 ms ====================== rdd.count = 20, time = 1721721718000 ms ====================== rdd.count = 19, time = 1721721720000 ms ====================== rdd.count = 19, time = 1721721722000 ms ====================== rdd.count = 19, time = 1721721724000 ms ============
复制代码


运行结果如下图所示:



发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-102 Spark Streaming 与 Kafka 集成全解析:Receiver 与 Direct 两种方式详解 附代码案例_Java_武子康_InfoQ写作社区