写点什么

结构化流 -Structured Streaming(八 - 中)

发布于: 4 小时前
结构化流-Structured Streaming(八-中)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


流数据源和接收器

既然我们已经介绍了端到端结构化流查询所需的基本步骤,那么让我们研究如何使用内置的流数据源和接收器。提醒一下,你可以使用 SparkSession.readStream()从流数据源创建 DataFrame,并使用 DataFrame.writeStream()写入结果 DataFrame 到输出。在每种情况下,你都可以使用 format()方法指定源类型。稍后我们将看到一些具体示例。

文件

结构化流支持以与批处理中支持的格式相同的格式在文件中读写数据流:Text、CSV、JSON、Parquet、ORC 等。在这里,我们将讨论如何在文件上操作结构化流。

从文件读取

结构化流可以将磁盘上的文件视为数据流。这是一个例子:

In Python

from pyspark.sql.types import *

inputDirectoryOfJsonFiles =  ...

 

fileSchema = (StructType()

  .add(StructField("key", IntegerType()))

  .add(StructField("value", IntegerType())))

 

inputDF = (spark

  .readStream

  .format("json")

  .schema(fileSchema)

  .load(inputDirectoryOfJsonFiles))


// In Scala

import org.apache.spark.sql.types._

val inputDirectoryOfJsonFiles =  ...

 

val fileSchema = new StructType()

  .add("key", IntegerType)

  .add("value", IntegerType)

 

val inputDF = spark.readStream

  .format("json")

  .schema(fileSchema)

  .load(inputDirectoryOfJsonFiles)

返回的流式 DataFrame 将具有指定的数据结构。这是使用文件时要记住的一些关键点:

所有文件都必须具有相同的格式,并且应该具有相同的数据结构。例如,如果格式为"json",则所有文件必须为 JSON 格式,每行一条 JSON 记录。每个 JSON 记录的模式必须与 readStream()所指定的模式匹配。否则会出现解析错误的情况(例如,意外的 null 值)或查询失败。

每个文件都必须以原子方式显示在目录中,也就是说,整个文件必须立即可用于读取,并且一旦可用,就无法更新或修改该文件。这是因为结构化流将在引擎找到文件(使用目录列表)并将其内部标记为 processed。对该文件的任何更改将不做处理。

当有多个新文件要处理但它只能在下一个微批处理中选择其中的一些文件时(例如,由于速率限制),它将选择时间戳最早的文件。但是,在微批处理中,没有预定义的读取所选文件的顺序;所有这些文件都将被并行读取。

 

此流文件源支持许多常用选项,包括受 spark.read()支持的文件格式特定选项(请参阅第 4 章中的“DataFrame 和 SQL 表的数据源”)和一些与流相关的特定选项(例如,maxFilesPerTrigger 限制文件处理速率))。有关完整的详细信息,请参见编程指南。


写入文件

结构化流支持将流查询输出写入到与读取文件时相同格式的文件中。但是,它仅支持追加模式,因为尽管很容易在输出目录中写入新文件(即,将数据追加到目录中),但是很难修改现有数据文件(正如更新和完整模式所预期的那样)。它还支持分区。这是一个例子:

In Python

outputDir = ...

checkpointDir = ...

resultDF = ...

 

streamingQuery = (resultDF.writeStream

  .format("parquet")

  .option("path", outputDir)

  .option("checkpointLocation", checkpointDir)

  .start())

  

// In Scala

val outputDir = ...

val checkpointDir = ...

val resultDF = ...

 

val streamingQuery = resultDF

  .writeStream

  .format("parquet")

  .option("path", outputDir)

  .option("checkpointLocation", checkpointDir)

  .start()

你可以直接将输出目录指定为 start(outputDir),而不是使用"path"选项。

有几个关键点需要记住:

通过维护已写入目录的数据文件的日志,结构化流可以在写入文件时实现端到端的精确一次保证。该日志保存在子目录_spark_metadata 中。目录(而不是其子目录)上的任何 Spark 查询将自动使用日志读取正确的数据文件集,从而实现精确一次性的保证(即,不读取重复的数据或部分文件)。请注意,其他处理引擎可能不知道此日志,因此可能无法提供相同的保证。

如果在两次重新启动之间更改结果 DataFrame 的数据结构,则输出目录将具有多个数据结构中的数据。在查询目录时,必须对这些数据结构进行归档。

Apache Kafka

Apache Kafka 是一种流行的发布/订阅系统,已广泛用于存储数据流。结构化流内置支持 Apache Kafka 进行读取和写入。

从 Kafka 读取

要从 Kafka 执行分布式读取,你必须使用选项指定如何连接到源。假设你要订阅该"events"主题的数据。下面是如何创建流式 DataFrame 的方法:

 In Python

inputDF = (spark

  .readStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("subscribe", "events")

  .load())

  

// In Scala

val inputDF = spark

  .readStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("subscribe", "events")

  .load()

返回的 DataFrame 将具有表 8-1 中描述的数据结构。


你还可以选择订阅多个主题,主题模式,甚至主题的特定分区。此外,你可以选择是只读取已订阅主题中的新数据,还是处理这些主题中的所有可用数据。甚至你还可以从批处理查询中读取 Kafka 数据——也就是说,将 Kafka 主题视为表。有关更多详细信息,请参见《 Kafka Integration Guide》。

写入 Kafka

对于写入 Kafka,结构化流期望结果 DataFrame 具有几列特定名称和类型的列,如表 8-2 所示。

你可以在这三种输出模式下写入 Kafka,尽管不建议使用全量模式,因为它会重复输出相同的记录。下面是在更新模式下将我们较早的计数查询的输出写入 Kafka 的具体示例:

 In Python

counts = ... # DataFrame[word: string, count: long]

streamingQuery = (counts

  .selectExpr(

    "cast(word as string) as key",

    "cast(count as string) as value")

  .writeStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("topic", "wordCounts")

  .outputMode("update")

  .option("checkpointLocation", checkpointDir)

  .start())

  

// In Scala

val counts = ... // DataFrame[word: string, count: long]

val streamingQuery = counts

  .selectExpr(

    "cast(word as string) as key",

    "cast(count as string) as value")

  .writeStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("topic", "wordCounts")

  .outputMode("update")

  .option("checkpointLocation", checkpointDir)

  .start()

有关更多详细信息,请参见《Kafka Integration Guide》。

自定义流数据源和接收器

在本节中,我们将讨论如何读写到在结构化流中没有内置支持的存储系统。特别是,你将看到如何使用 foreachBatch()和 foreach()方法来实现自定义逻辑以写入存储。

写入任何存储系统

有两种操作可让你将流查询的输出写入任意存储系统:foreachBatch()和 foreach()。它们的用例略有不同:虽然 foreach()允许在每行上使用自定义写逻辑,但是 foreachBatch()在每个微批处理的输出上允许进行任意操作和自定义逻辑。让我们来更详细地探讨它们的用法。

使用 foreachBatch()

foreachBatch() 允许你指定在流查询的每个微批处理的输出上执行计算函数。它具有两个参数:一个是具有微批处理输出的 DataFrame 或数据集,以及微批处理的唯一标识符。例如,假设我们要将早期的计数查询的输出写入 Apache Cassandra。对于 Spark Cassandra Connector 2.4.2,不支持编写流式 DataFames。但是你可以使用连接器的批处理 DataFrame 支持将每个批处理的输出(即更新的字数)写入 Cassandra,如下所示:

 In Python

hostAddr = "<ip address>"

keyspaceName = "<keyspace>"

tableName = "<tableName>"

 

spark.conf.set("spark.cassandra.connection.host", hostAddr)

 

def writeCountsToCassandra(updatedCountsDF, batchId):

    # Use Cassandra batch data source to write the updated counts

    (updatedCountsDF

      .write

      .format("org.apache.spark.sql.cassandra")

      .mode("append")

      .options(table=tableName, keyspace=keyspaceName)

      .save())

      

streamingQuery = (counts

  .writeStream

  .foreachBatch(writeCountsToCassandra)

  .outputMode("update")

  .option("checkpointLocation", checkpointDir)

  .start())

  

// In Scala

import org.apache.spark.sql.DataFrame

 

val hostAddr = "<ip address>"

val keyspaceName = "<keyspace>"

val tableName = "<tableName>"

 

spark.conf.set("spark.cassandra.connection.host", hostAddr)

 

def writeCountsToCassandra(updatedCountsDF: DataFrame, batchId: Long) {

    // Use Cassandra batch data source to write the updated counts

    updatedCountsDF

      .write

      .format("org.apache.spark.sql.cassandra")

      .options(Map("table" -> tableName, "keyspace" -> keyspaceName))

      .mode("append")

      .save()

    }

 

val streamingQuery = counts

  .writeStream

  .foreachBatch(writeCountsToCassandra _)

  .outputMode("update")

  .option("checkpointLocation", checkpointDir)

  .start()

使用 foreachBatch(),你可以执行以下操作:


重用现有的批处理数据源

如前面的示例所示,通过 foreachBatch()你可以使用现有的批处理数据源(即支持编写批处理 DataFrame 的源)来编写流查询的输出。

写入多个位置

如果要将流查询的输出写入多个位置(例如,OLAP 数据仓库和 OLTP 数据库),则可以简单地多次将输出 DataFrame 或者 Dataset 多次写入。但是,每次尝试写入都可能导致重新计算输出数据(包括可能重新读取输入数据)。为避免重新计算,应缓存 batchOutputDataFrame,将其写入多个位置,最后释放缓存:

In Python

def writeCountsToMultipleLocations(updatedCountsDF, batchId):

  updatedCountsDF.persist()

  updatedCountsDF.write.format(...).save()  # Location 1

  updatedCountsDF.write.format(...).save()  # Location 2

  updatedCountsDF.unpersist()

  

// In Scala

def writeCountsToMultipleLocations(

  updatedCountsDF: DataFrame,

  batchId: Long) {

    updatedCountsDF.persist()

    updatedCountsDF.write.format(...).save()  // Location 1

    updatedCountsDF.write.format(...).save()  // Location 2

    updatedCountsDF.unpersist()

 }

应用其他 DataFrame 操作

在流式 DataFrame 上不支持许多 DataFrame API 操作,因为在这些情况下结构化流不支持生成增量计划。使用 foreachBatch(),你可以在每个微批处理输出上应用其中一些操作。但是,你需要清楚知道自己所执行的操作的端到端语义。

 

foreachBatch()仅提供至少一次写入保证。通过使用 batchId 将重复执行的微批中删除重复写入的数据,以此可以实现精确一次性保证。

 

使用 foreach()

如果 foreachBatch()不是一个选项(例如,如果不存在相应的批处理数据编写器),则可以使用 foreach()来表示自定义编写器逻辑。具体来说,可以通过将其划分为三种方法表达数据写入的逻辑:open()、process()和 close()。结构化流将使用这些方法来写入输出记录的每个分区。这是一个抽象的例子:

In Python

Variation 1: Using function

def process_row(row):

 Write row to storage

    pass

query = streamingDF.writeStream.foreach(process_row).start()

Variation 2: Using the ForeachWriter class

class ForeachWriter:

  def open(self, partitionId, epochId):

Open connection to data store

Return True if write should continue

This method is optional in Python

If not specified, the write will continue automatically

return True


 def process(self, row):

Write string to data store using opened connection

This method is NOT optional in Python

pass


def close(self, error):

Close the connection. This method is optional in Python

pass


resultDF.writeStream.foreach(ForeachWriter()).start()


// In Scala

import org.apache.spark.sql.ForeachWriter

val foreachWriter = new ForeachWriter[String] { // typed with Strings

 def open(partitionId: Long, epochId: Long): Boolean = {

      // Open connection to data store

      // Return true if write should continue

    }


def process(record: String): Unit = {

      // Write string to data store using opened connection

    }


def close(errorOrNull: Throwable): Unit = {

      // Close the connection

    }

 }


resultDSofStrings.writeStream.foreach(foreachWriter).start()

执行的这些方法的详细语义在《结构化流编程指南》中进行了讨论。

从任何存储系统读取

不幸的是,从 Spark 3.0 开始,用于构建自定义流数据源和接收器的 API 仍处于试验阶段。Spark 3.0 中的 DataSourceV2 计划引入了流 API,但尚未宣布它们是稳定的。因此,没有正式的方法可以从任意存储系统读取数据。

 

数据转换

在本节中,我们将更深入地研究结构化流中支持的数据转换。如前所述,结构化流中仅支持可以增量执行的 DataFrame 操作。这些操作大致分为无状态操作和有状态操作。我们将定义每种类型的操作,并说明如何识别哪些操作是有状态的。

增量执行和流状态

正如我们在“活跃(Active)流查询的底层原理”中讨论的那样,Spark SQL 中的 Catalyst 优化器将所有 DataFrame 操作转换为优化的逻辑计划。决定如何执行逻辑计划的 Spark SQL 规划器会识别到这是一个流逻辑计划,需要对连续数据流进行操作。因此,规划器不是将逻辑计划转换为一次性的物理执行计划,而是生成连续的执行计划序列。每个执行计划增量地更新最终结果 DataFrame——也就是说,该计划仅处理输入流中的一小部分新数据,并可能处理由先前执行计划计算出的某些中间部分结果。

每个执行都被视为微批处理,并且在执行之间传递的部分中间结果称为流“状态”。根据增量操作是否需要维持状态,DataFrame 操作可以大致分为无状态操作和有状态操作。在本节的其余部分,我们将探讨无状态操作和有状态操作之间的区别,以及它们在流查询中如何要求不同的 runtime 配置和资源管理。

 

某些逻辑运算从根本上讲是不切实际的,或者对于增量计算而言代价非常高,因此结构化流不支持它们。例如,任何尝试使用诸如 cube()或 rollup()的操作启动流查询的尝试都将引发 UnsupportedOperationException 异常。

无状态转换

所有投影操作(例如 select(),explode(),map(),flatMap())和选择操作(例如 filter(),where())分别处理每个输入记录,而不需要依赖以前行的任何信息。缺少对先前输入数据的依赖,使它们成为无状态操作。

只有无状态操作的流查询支持追加和更新输出模式,但不支持全量模式。这是有道理的:由于此类查询的任何处理后的输出行都无法由任何将来的数据修改,因此可以以追加模式(包括仅追加的,比如任何格式的文件)将其写出到所有流接收器。另一方面,此类查询天生不会合并输入记录中的信息,因此可能不会减少结果中的数据量。不支持全量模式,因为存储不断增长的结果数据通常成本很高。这与有状态转换形成了鲜明的对比,正如我们接下来将要讨论的。

有状态转换

有状态转换的最简单示例是 DataFrame.groupBy().count(),它生成自查询开始以来接收到的记录数的运行计数。在每个微批处理中,增量计划都会将新记录的计数添加到由先前的微批处理生成的计数中。计划之间传递的部分计数就是我们所说的状态。此状态在 SparkExecutor 的内存中维护,并被检查点指向已配置的位置,以容忍故障。尽管 Spark SQL 自动管理此状态的生命周期以确保获得正确的结果,但是通常你必须调整一些配置以控制维持状态的资源使用情况。在本节中,我们将探索不同的有状态算子如何在后台管理其状态。

分布式和状态容错管理

回顾第一章和第二章,在一个集群中运行的 Spark 应用程序具有一个 Driver 和一个或多个 Executor。驱动程序中运行的 Spark 调度程序将你的高级操作分解为较小的任务,并将其放入任务队列中,并且随着资源的可用,Executor 从队列中拉出任务以执行它们。流查询中的每个微批处理本质上都执行这样的一组任务,这些任务从流数据源读取新数据并将更新后的输出写入流接收器。对于有状态流处理查询,除了写入接收器之外,每个任务的微型批处理都会生成中间状态数据,这些状态数据将由下一个微批处理使用。状态数据的生成是进行分区和分布式的(因为所有读取,写入和处理都在 Spark 中),并且将其缓存在 Executor 内存中以进行有效利用。如图 8-6 所示,它显示了在原始流单词统计查询中如何管理状态。


每个微批处理读取一组新的单词,在 Executor 中将它们混洗并进行分组,计算微批处理中的计数,最后将它们添加到运行的计数中以产生新的计数。这些新计数既是下一个微批处理的输出,同时也是状态,因此它们被缓存在 Executor 的内存中。下一个微批数据以与以前完全相同的方式在 Executor 之间分组,因此每个单词始终由同一个 Executor 处理,因此可以在本地读取和更新其运行计数。

但是,仅将此状态保留在内存中是不够的,因为任何故障(Executor 故障或整个应用程序故障)都将导致内存中状态丢失。为避免丢失,我们将键/值状态更新作为更改日志(change log)同步保存在用户提供的检查点位置中。这些更改与在每个批次中处理的偏移量形成独立的版本,并且可以通过读取检查点日志来自动重建所需的状态版本。在发生任何故障的情况下,结构化流处理能够通过重新处理相同的输入数据以及重新拿到微批处理之前的状态来重新执行发生故障的微批处理,从而产生与没有故障时相同的输出数据。这对于确保端到端精确的一次保证至关重要。

总而言之,对于所有有状态操作,结构化流可以通过以分布式方式自动保存和恢复状态来确保操作的正确性。根据有状态操作,你需要做的就是调整状态清理策略,以便可以从缓存状态中自动删除旧键和值。这是我们接下来要讨论的。

有状态算子的类型

流状态的本质是保留过去数据的摘要。有时需要从状态中清理旧的摘要信息,以便为新摘要腾出空间。基于此操作,我们可以区分两种类型的有状态操作:

托管状态操作

它们根据特定于操作的“旧”定义自动识别并清理旧状态。你可以调整定义为旧的内容以控制资源使用情况(例如,用于存储状态的 Executor 内存)。属于这一类的操作包括:

 流式聚合

 流与流之间连接

 流重复数据删除

非托管状态操作

这些操作使你可以定义自己的自定义状态清理逻辑。这一类的操作是:

l MapGroupsWithState

l FlatMapGroupsWithState

这些操作允许你可以定义任意有状态操作(会话化等)。

接下来的内容将详细讨论以上的每一个操作。

 

有状态的流聚合

结构化流可以增量执行大多数 DataFrame 聚合操作。你可以按 key(例如,流单词计数)和/或按时间(例如,计数每小时接收到的记录)聚合数据。在本节中,我们将讨论优化这些不同类型的流聚合的语义和操作细节。我们还将简要讨论流式传输不支持的几种聚合类型。让我们从不涉及时间的聚合内容开始。

不涉及时间的聚合

不涉及时间的聚合可以大致分为两类:

全局聚合

流中所有数据的聚合。例如,假设你有一个由传感器读数流生成的名为 sensorReadings 的流式 DataFrame 。你可以使用以下查询来计算接收到的读数总数的运行计数:


In Python

runningCount = sensorReadings.groupBy().count()

 

// In Scala

val runningCount = sensorReadings.groupBy().count()

你不能在流式 DataFrame 上直接使用像 DataFrame.count()和 Dataset.reduce()这样的聚合操作。这是因为,对于静态 DataFrame,这些操作会立即返回最终计算出的聚合,而对于流式 DataFrame,则必须不断更新这些聚合值。因此,你必须始终在流式 DataFrame 上使用 DataFrame.groupBy()或 Dataset.groupByKey()进行聚合。

 

分组汇总

数据流中存在的每个组或键中的聚合。例如,如果 sensorReadings 包含来自多个传感器的数据,则可以使用以下方法计算每个传感器的运行平均值读数(例如,为每个传感器设置基线值):

In Python

baselineValues = sensorReadings.groupBy("sensorId").mean("value")

 

// In Scala

val baselineValues = sensorReadings.groupBy("sensorId").mean("value")

除了计数和平均值外,流式 DataFrame 还支持以下类型的聚合(类似于批处理 DataFrame):

 

所有内置的聚合函数

sum(),mean(),stddev(),countDistinct(),collect_set(),approx_count_distinct()等等。参见 API 文档(Python 的和 Scala 的)的更多细节。

 

同时应用多个聚合函数

你可以通过以下方式应用多个聚合函数来一起计算:

 In Python

from pyspark.sql.functions import *

multipleAggs = (sensorReadings

  .groupBy("sensorId")

  .agg(count("*"), mean("value").alias("baselineValue"),

    collect_set("errorCode").alias("allErrorCodes")))

    

// In Scala

import org.apache.spark.sql.functions.*

val multipleAggs = sensorReadings

  .groupBy("sensorId")

  .agg(count("*"), mean("value").alias("baselineValue"),

    collect_set("errorCode").alias("allErrorCodes"))

 

用户定义的聚合函数

支持所有用户定义的聚合功能。有关未类型化和已类型化用户定义的聚合函数的更多详细信息,请参见 Spark SQL 编程指南。

关于这种流式聚合的执行,我们在前面的章节中已经说明了如何将运行中的聚合作为分布式状态来维护。除此之外,对于不基于时间的聚合,要记住两个非常重要的要点:用于此类查询的输出模式以及按状态规划资源使用情况。这些将在本节末尾讨论。接下来,我们将讨论在时间窗口内合并数据的聚合。

带有事件时间窗口的聚合

在许多情况下,你不希望对整个流运行聚合,而是希望对按照时间窗口存储的数据进行聚合。继续我们的传感器示例,假设每个传感器每分钟最多发送一个读数,并且我们想检测是否有任何传感器报告异常高的次数。为了发现这种异常,我们可以计算每隔五分钟从每个传感器收到的读数。此外,出于鲁棒性考虑,我们应该根据传感器生成数据的时间来计算时间间隔,而不是根据接收数据的时间来计算时间间隔,因为任何传输延迟都会影响结果正确性。换句话说,我们要使用事件时间,即记录中代表生成读数的时间的时间戳。也就是说 sensorReadings DataFrame 将生成时间戳记作为名为 eventTime 的列。我们可以将这五分钟的时间表示如下:

In Python

from pyspark.sql.functions import *

(sensorReadings

  .groupBy("sensorId", window("eventTime", "5 minute"))

  .count())

  

// In Scala

import org.apache.spark.sql.functions.*

sensorReadings

  .groupBy("sensorId", window("eventTime", "5 minute"))

  .count()

这里要注意的关键是 window()函数,它使我们可以将五分钟的窗口表示为动态计算的分组列。启动后,此查询将对每个传感器读数有效地执行以下操作:

使用该 eventTime 值计算传感器读数进入的五分钟时间窗口。

根据复合分组对读数进行分组。(<computed window>, SensorId)

更新复合数据组的计数。

让我们通过一个说明性的例子来理解这一点。图 8-7 显示了如何根据事件时间将一些传感器读数映射到五分钟滚动(即不重叠)窗口的组。这两个时间线显示结构化流处理每个接收事件的时间,以及事件数据中的时间戳(通常是在传感器处生成事件的时间)。

在事件时间的每个五分钟窗口中,都将考虑进行分组,基于该分组将计算计数。请注意,事件可能会延迟到达,并且在事件时间方面可能会出现乱序。如图所示,事件时间为 12:07 的事件在事件时间为 12:11 的事件之后被接收并处理。但是,无论事件何时到达,都会根据事件时间将每个事件分配给适当的组。实际上,根据窗口规范,每个事件可以分配给多个组。例如,如果要计算与每 5 分钟滑动一次的 10 分钟窗口相对应的计数,则可以执行以下操作:

In Python

(sensorReadings

  .groupBy("sensorId", window("eventTime", "10 minute", "5 minute"))

  .count())

  

// In Scala

sensorReadings

  .groupBy("sensorId", window("eventTime", "10 minute", "5 minute"))

  .count()

在此查询中,每个事件都将分配给两个重叠的窗口,如图 8-8 所示。

每个唯一元组(<assigned time window>, sensorId)都被视为动态生成的组,将为其计算计数。例如,该事件[eventTime = 12:07, sensorId = id1]被映射到两个时间窗(12:00-12:10, id1)(12:05-12:15, id1),这两个窗口的计数每个都增加 1。图 8-9 展示了先前显示的事件的计数。

假设处理输入记录的触发间隔为 5 分钟,图 8-9 底部的表显示了每个微批的结果表的状态(即计数)。随着事件时间的推移,将自动创建新组并自动更新其聚合。延迟和乱序事件会自动得到处理,因为它们只需要更新较旧的组。

但是,从资源使用的角度来看,这带来了一个不同的问题——状态大小的无限增长。随着与最新时间窗口相对应的新组的创建,旧组继续占用状态内存,等待任何迟到的数据更新它们。即使实际的操作中,对输入数据的延迟时间有一定的限制(例如,数据延迟不能超过 7 天),查询也不知道这些信息。因此,它不知道何时将窗口视为“太旧而无法接收更新”并将其从状态中删除。要提供绑定到查询延迟(并防止无限制状态),你可以指定水位线,这将在下面讨论。

带水位线处理延迟数据

水位线是一个阈值,用于指定系统等待延迟事件的时间,如果到达事件位于水位线之内,它将用于更新查询。否则,如果它早于水位线,它将被丢弃,并且流引擎不会对其进行进一步处理。通过知道给定组不再有数据到达的点,引擎可以自动确定某些组的聚合并将其从状态中删除。这可以限制引擎为计算查询结果所必须保持的状态总量。

例如,假设你知道传感器数据的延迟不会超过 10 分钟。然后,你可以如下设置水位线:

 In Python

(sensorReadings

  .withWatermark("eventTime", "10 minutes")

  .groupBy("sensorId", window("eventTime", "10 minutes", "5 minutes"))

  .mean("value"))

  

// In Scala

sensorReadings

  .withWatermark("eventTime", "10 minutes")

  .groupBy("sensorId", window("eventTime", "10 minutes", "5 minute"))

  .mean("value")

 

请注意,你必须在 groupBy()和用于定义窗口的时间戳列之前调用 withWatermark()。执行此查询时,结构化流将连续跟踪 eventTime 列的最大观察值,并相应地更新水位线,过滤“延迟”的数据,并清理旧状态。也就是说,任何超过 10 分钟的数据都将被忽略,并且所有比最新(按事件时间)输入数据早 10 分钟以上的时间窗口都将从状态中清理。为了阐明如何执行此查询,请参考图 8-10 中的时间线,该时间线显示处理输入记录的方式。

该图显示了按照记录的处理时间(x 轴)和事件时间(y 轴)处理的记录的二维图。记录在五分钟的微批处理,并用圆圈标记。底部的表显示了每个微批处理完成后结果表的状态。

每条记录都在其左侧的所有记录之后被接收和处理。考虑两条记录[12:15, id1](在 12:17 左右处理)和[12:13, id3](在 12:18 左右处理)。记录 id3 被认为是延迟数据(因此以红色实线标记),因为它是在 id1 记录之前由传感器生成的,但是在记录之后进行处理的。但是,在处理时间范围为 12:15–12:20 的微批处理中,使用的水位线为 12:04 ,该水位线是根据直到前一个微批处理为止的最大事件时间(即 12:14)计算得出的减去 10 分钟的水位线延迟。因此,因此,迟到记录[12:13,id3]并不被认为为太晚,并计算成功。相反,在下一个微批处理中,记录[12:04, id1] 与新水位线 12:11 相比被认为迟到太久了,因此被丢弃。

你可以根据应用程序的要求设置水位线延迟——该参数的较大值允许数据稍后到达,但是会以增加状态大小(即内存使用)为代价,反之亦然。

带水位线的语义保证

在总结本节有关水位线的内容之前,让我们考虑一下水位线提供的精确语义保证。10 分钟的水位线可确保引擎永远不会丢弃与输入数据中看到的最新事件时间相比延迟少于 10 分钟的任何数据。但是,保证仅在一个方向上是严格的。延迟超过 10 分钟的数据并不能保证会被删除,也就是说,它们可能会被聚合。延迟超过 10 分钟的输入记录是否会被实际聚合,取决于接收记录的时间以及触发微批处理的确切时间。

支持的输出模式

与不涉及时间的流式聚合不同,具有时间窗口的聚合可以使用所有三种输出模式。但是,你还需要了解有关状态清理的其他含义,具体取决于模式:

更新模式(Update mode)

在这种模式下,每个微批处理将仅输出更新聚合的行。此模式可用于所有类型的聚合。特别是对于时间窗口聚合,水位线将确保定期清理状态。这是使用流式聚合运行查询的最有用和最有效的模式。但是,你不能使用此模式将聚合写入仅支持追加模式的流接收器,例如任何基于 Parquet 和 ORC 格式的文件(除非你使用 Delta Lake,我们将在下一章中讨论)。

全量模式(Complete mode)

在这种模式下,每个微批处理都会输出所有更新的聚合,而不论其时间长短或是否包含原有信息更改。尽管此模式可用于所有类型的聚合,但对于时间窗口聚合,使用全量模式意味着即使指定了水位线也不会清理状态。输出所有聚合都需要依赖所有过去的状态,因此即使定义了水位线,也必须保留聚合数据。在时间窗口聚合上谨慎使用此模式,因为这可能导致状态大小和内存使用无限增长。

追加模式(Append mode)

此模式只能与事件时间窗口上的聚合以及启用了水位线功能一起使用。回想一下,追加模式不允许以前的输出结果发生变化。对于任何没有水位线的聚合,每个聚合都可以使用任何将来的数据进行更新,因此这些数据无法以追加模式输出。只有在事件时间窗口上的聚合上启用了加水位线功能时,查询才知道聚合何时不再进行任何更新。因此,仅当水位线确保不会再次更新聚合时,追加模式才输出每个键及其最终的聚合值,而不是输出更新的行。此模式的优点是它允许你将聚合写入仅支持追加数据的流接收器(例如文件)。

 

下一篇文章我们将讲解流关联、任意的有状态计算以及性能调优相关的知识点。

发布于: 4 小时前阅读数: 10
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
结构化流-Structured Streaming(八-中)