写点什么

大数据 -101 Spark Streaming 有状态转换详解:窗口操作与状态跟踪实战 附多案例代码

作者:武子康
  • 2025-09-19
    山东
  • 本文字数:4326 字

    阅读完需:约 14 分钟

大数据-101 Spark Streaming 有状态转换详解:窗口操作与状态跟踪实战 附多案例代码

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 15 日更新到:Java-124 深入浅出 MySQL Seata 框架详解:分布式事务的四种模式与核心架构 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Spark Streaming DStream 转换函数

  • DStream 无状态转换

  • DStream 无状态转换 案例

转换方式

有两个类型:


  • 无状态转换(已经完成)

  • 有状态转换


接下来开始有状态转换。

有状态转换

有状态转换主要有两种:


  • 窗口操作

  • 状态跟踪操作

窗口操作

Window Operations 可以设置窗口大小和滑动窗口间隔来动态获取当前 Streaming 的状态基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。



基于窗口的操作需要两个参数:


  • 窗口长度(Window Duration):控制每次计算最近的多少个批次的数据

  • 滑动间隔(Slide Duration):用来控制对新的 DStream 进行计算的间隔


两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍

准备编码

我们先编写一个每秒发送一个数字:


package icu.wzk
import java.io.PrintWriterimport java.net.{ServerSocket, Socket}
object SocketWithWindow {
def main(args: Array[String]): Unit = { val port = 9999 val ss = new ServerSocket(port) val socket: Socket = ss.accept() var i = 0 while (true) { i += 1 val out = new PrintWriter(socket.getOutputStream) out.println(i) out.flush() Thread.sleep(1000) } }}
复制代码

[窗口操作] 案例 2 观察窗口数据

  • 观察窗口的数据

  • 观察 batchDuration、windowDuration、slideDuration 三者之间的关系

  • 使用窗口相关的操作

编写代码

package icu.wzkobject WindowDemo {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("WindowDemo") .setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) lines.foreachRDD { (rdd, time) => { println(s"rdd = ${rdd.id}; time = $time") } rdd.foreach(value => println(value)) }
// 20秒窗口长度(DS包含窗口长度范围内的数据) // 10秒滑动间隔(多次时间处理一次数据) val res1: DStream[String] = lines .reduceByWindow(_ + " " + _, Seconds(20), Seconds(10)) res1.print()
val res2: DStream[String] = lines .reduceByWindow(_ + _, Seconds(20), Seconds(10)) res2.print()
// 求窗口元素的和 val res3: DStream[Int] = lines .map(_.toInt) .reduceByWindow(_ + _, Seconds(20), Seconds(10)) res3.print()
// 请窗口元素和 val res4 = res2.map(_.toInt).reduce(_ + _) res4.print()
// 程序启动 ssc.start() ssc.awaitTermination()
}}
复制代码

运行结果

-------------------------------------------Time: 1721628860000 ms-------------------------------------------
rdd = 39; time = 1721628865000 msrdd = 40; time = 1721628870000 ms-------------------------------------------Time: 1721628870000 ms-------------------------------------------
-------------------------------------------Time: 1721628870000 ms-------------------------------------------
-------------------------------------------Time: 1721628870000 ms-------------------------------------------
复制代码


运行之后控制截图如下:


[窗口操作] 案例 3 热点搜索词实时统计

编写代码

package icu.wzkobject HotWordStats {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("HotWordStats") .setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2)) // 检查点设置 也可以设置到 HDFS ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数 val wordCounts1: DStream[(String, Int)] = pairs .reduceByKeyAndWindow( (a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2 ) wordCounts1.print()
// 需要CheckPoint的支持 val wordCounts2: DStream[(String, Int)] = pairs .reduceByKeyAndWindow( _ + _, _ - _, Seconds(20), Seconds(10), 2 ) wordCounts2.print()
// 运行程序 ssc.start() ssc.awaitTermination() }
}
复制代码

运行结果

-------------------------------------------Time: 1721629842000 ms-------------------------------------------(4,1)(8,1)(6,1)(2,1)(7,1)(5,1)(3,1)(1,1)
-------------------------------------------Time: 1721629842000 ms--------------------
复制代码


运行结果如下图:


[状态追踪操作] updateStateByKey

UpdateStateByKey 的主要功能:


  • 为 Streaming 中每一个 Key 维护一份 State 状态,state 类型可以是任意类型的,可以是自定义对象,更新函数也可以是自定义的

  • 通过更新函数对该 Key 的状态不断更新,对于每个新的 batch 而言,Spark Streaming 会在使用 updateStateByKey 的时候已经存在的 key 进行 state 状态更新

  • 使用 updateStateByKey 时要开启 CheckPoint 功能

编写代码 1

流式程序启动后计算 wordcount 的累计值,将每个批次的结果保存到文件


package icu.wzkobject StateTracker1 {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("StateTracker1") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
// 定义状态更新函数 // 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态 // 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态 val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => { // 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和 val currentCount = currValues.sum // 已累加的值 val previousCount = prevValueState.getOrElse(0) Some(currentCount + previousCount) }
val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc) stateDStream.print()
// 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录 val outputDir = "output1" stateDStream .repartition(1) .saveAsTextFiles(outputDir)
// 开始运行 ssc.start() ssc.awaitTermination() }}
复制代码

运行结果 1

-------------------------------------------Time: 1721631080000 ms-------------------------------------------(1,1)(2,1)(3,1)
-------------------------------------------Time: 1721631085000 ms-------------------------------------------(8,1)(1,1)(2,1)(3,1)(4,1)(5,1)(6,1)(7,1)
复制代码


运行结果是:



统计全局的 Key 的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的 Key 的状态。


这样的缺点:


  • 如果数据量很大的话,CheckPoint 数据会占用较大存储,而且效率也不高

编写代码 2

mapWithState:也是用于全局统计 Key 的状态,如果没有数据输入,便不会返回之前的 Key 的状态,有一点增量的感觉。这样做的好处是,只关心那些已经发生的变化的 Key,对于没有数据输入,则不会返回那些没有变化的 Key 的数据,即使数据量很大,checkpoint 也不会像 updateStateByKey 那样,占用太多的存储。


package icu.wzkobject StateTracker2 {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("StateTracker2") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(2)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = { val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0) state.update(sum) (key, sum) }
val spec = StateSpec.function(mappingFunction _) val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)
resultDStream.cache()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录 val outputDir = "output2" resultDStream.repartition(1).saveAsTextFiles(outputDir)
ssc.start() ssc.awaitTermination()
}}
复制代码

运行代码


发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-101 Spark Streaming 有状态转换详解:窗口操作与状态跟踪实战 附多案例代码_Java_武子康_InfoQ写作社区