写点什么

大数据 -100 Spark DStream 转换操作全面总结:map、reduceByKey 到 transform 的实战案例

作者:武子康
  • 2025-09-18
    山东
  • 本文字数:4812 字

    阅读完需:约 16 分钟

大数据-100 Spark DStream 转换操作全面总结:map、reduceByKey 到 transform 的实战案例

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 15 日更新到:Java-124 深入浅出 MySQL Seata 框架详解:分布式事务的四种模式与核心架构 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Spark Streaming 基础数据源

  • 文件流、Socket 流、RDD 队列流

  • 引入依赖、Java 编写多种流进行测试


DStream 转换

DStream 上的操作与 RDD 类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:


  • updateStateByKey

  • transform

  • window 相关操作




map(func)

对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。例如,将每个记录转换为其长度。示例:val lengths = lines.map(line => line.length)

flatMap(func)

对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。例如,将每一行文本拆分为单词。示例:val words = lines.flatMap(line => line.split(" "))

filter(func)

对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。例如,过滤掉长度小于 5 的单词。示例:val filteredWords = words.filter(word => word.length > 5)

reduceByKey(func)

对键值对 DStream 进行聚合操作,对具有相同键的元素应用 func 函数。例如,计算每个单词的总数。示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

groupByKey()

对键值对 DStream 中的每个键进行分组,并将具有相同键的值聚合到一个列表中。示例:val grouped = pairs.groupByKey()

count()

统计 DStream 中每个 RDD 的元素个数。示例:val count = words.count()

countByValue()

统计 DStream 中每个 RDD 中每个值的出现次数。示例:val valueCounts = words.countByValue()

union(otherDStream)

将两个 DStream 合并为一个新的 DStream,包含两个 DStream 中的所有元素。示例:val mergedStream = stream1.union(stream2)

join(otherDStream)

对两个键值对 DStream 进行连接操作,类似 SQL 中的 JOIN 操作。示例:val joinedStream = stream1.join(stream2)


备注:


  • 在 DStream 与 RDD 上的转换操作非常类似(无状态操作)

  • DStream 有自己特殊的操作(窗口操作、追踪状态变化操作)

  • 在 DStream 上的转换操作比 RDD 上的转换操作少


DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:


  • 无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的 RDD 转化操作,例如:map、Filter、reduceByKey 等

  • 有状态转换操作,需要使用之前批次的数据或者是中间结果来计算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作

无状态转换

无状态转换是 Spark Streaming 中最基本的操作类型,它针对每个批次的 RDD 独立执行转换操作,不会保留任何历史状态信息。这种转换方式的特点是对每个时间窗口的数据处理都是独立的,不会跨批次维护状态。

基本无状态转换操作

以下是一些常见的无状态转换操作及其应用场景:


  1. map:对 DStream 中的每个元素应用给定的函数

  2. 示例:将字符串转换为大写


   val upper = lines.map(_.toUpperCase())
复制代码


  1. flatMap:类似于 map,但每个输入元素可以映射到 0 或多个输出元素

  2. 示例:将每行文本拆分为单词


   val words = lines.flatMap(_.split(" "))
复制代码


  1. repartition:改变 DStream 的分区数量

  2. 示例:将分区数调整为 10 个


   val repartitioned = words.repartition(10)
复制代码


  1. reduceByKey:对具有相同键的值进行归约

  2. 示例:计算单词出现频率


   val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
复制代码


  1. groupByKey:将具有相同键的值分组

  2. 示例:按用户 ID 分组


   val userEvents = events.groupByKey()
复制代码

高级转换操作:transform

transform操作是 Spark Streaming 中最强大的无状态转换之一,它允许开发者直接操作 DStream 内部的 RDD。其特点包括:


  1. 工作原理:

  2. 对源 DStream 的每个 RDD 应用一个 RDD-to-RDD 函数

  3. 在数据流的每个批次中都会被调用

  4. 生成一个全新的 DStream

  5. 典型应用场景:

  6. 实现自定义的 RDD 操作

  7. 组合多个 RDD 操作

  8. 访问 RDD 特定的 API(如 join、cogroup 等)

  9. 示例代码:


   val filtered = words.transform { rdd =>     // 可以在这里使用任何RDD操作     rdd.filter(_.length > 3)       .map(_.toUpperCase())   }
复制代码


  1. 实际应用案例:

  2. 实时数据清洗:过滤掉不符合要求的数据

  3. 特征工程:对原始数据进行复杂的特征提取

  4. 数据关联:将流数据与静态数据集进行 join 操作


transform操作的优势在于它提供了最大的灵活性,开发者可以像操作普通 RDD 一样处理流数据,同时保持 Spark Streaming 的批处理特性。这使得 Spark Streaming 可以支持更复杂的业务逻辑,而不仅限于预定义的转换操作。

案例 1 黑名单过滤

假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据未生效val arr1 = Array(("spark", true), ("scala", false))假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操作。如"2 spark"要被过滤掉1 hadoop2 spark3 scala4 java5 hive结果:"2 spark" 被过滤
复制代码

方案 1 外连接实现

package icu.wzk
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.dstream.ConstantInputDStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter1") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10))
// 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 测试数据 val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" } val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理 val clickStreamFormatted = clickStream .map(value => (value.split(" ")(1), value)) clickStreamFormatted.transform(clickRDD => { val joinedBlockListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD) joinedBlockListRDD.filter { case (word, (streamingLine, flag)) => { if (flag.getOrElse(false)) { false } else { true } } }.map { case (word, (streamingLine, flag)) => streamingLine } }).print()
// 启动 ssc.start() ssc.awaitTermination() }}
复制代码

方案 1 运行结果

-------------------------------------------Time: 1721618670000 ms-------------------------------------------5 hive6 hbase1 java7 zookeeper3 hadoop4 kafka
... 下一批
复制代码


对应的结果如下图所示:


方案 2 SQL 实现

package icu.wzk
import org.apache.spark.SparkConfimport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.streaming.dstream.ConstantInputDStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter2") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("WARN")
// 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 生成测试 DStream val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" } val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理 val clickStreamFormatted = clickStream .map(value => (value.split(" ")(1), value)) clickStreamFormatted.transform { clickRDD => val spark = SparkSession .builder() .config(rdd.sparkContext.getConf) .getOrCreate()
import spark.implicits._ val clickDF: DataFrame = clickRDD.toDF("word", "line") val blackDF: DataFrame = blackListRDD.toDF("word", "flag") clickDF.join(blackDF, Seq("word"), "left") .filter("flag is null or flag == false") .select("line") .rdd }.print()
ssc.start() ssc.awaitTermination() }}
复制代码

方案 2 SQL 运行结果

-------------------------------------------Time: 1721619900000 ms-------------------------------------------[6 hbase][4 kafka][7 zookeeper][1 java][3 hadoop][5 hive]
复制代码


运行结果截图如下图所示:


方案 3 直接过滤

package icu.wzk
import org.apache.spark.SparkConfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.streaming.dstream.ConstantInputDStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter3 {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter3") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("WARN")
// 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListBC: Broadcast[Array[String]] = ssc .sparkContext .broadcast(blackList.filter(_._2).map(_._1))
// 生成测试DStream val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" }
val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理 clickStream.map(value => (value.split(" ")(1), value)) .filter { case (word, _) => !blackListBC.value.contains(word) } .map(_._2) .print()
// 启动 ssc.start() ssc.awaitTermination() }}
复制代码

方案 3 直接过滤运行结果

-------------------------------------------Time: 1721627600000 ms-------------------------------------------1 java3 hadoop4 kafka5 hive6 hbase7 zookeeper
... 下一批
复制代码


运行结果如下图所示:



发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-100 Spark DStream 转换操作全面总结:map、reduceByKey 到 transform 的实战案例_Java_武子康_InfoQ写作社区