写点什么

结构化流 -Structured Streaming(八 - 上)

发布于: 15 小时前
结构化流-Structured Streaming(八-上)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在前面的章节中,我们学习了如何使用结构化 API 来处理数据规模巨大的有界数据。但是,数据经常连续到达并且需要实时处理。在本章中,我们将讨论如何将相同的结构化 API 来处理数据流。

Apache Spark 流处理引擎的演变

流处理被定义为连续处理无穷无尽的数据流。随着大数据时代的到来,流处理系统已从单节点处理引擎过渡到多节点分布式处理引擎。传统的分布式流处理是用一个一次一记录的处理模型来实现的,如图 8-1 所示。

处理管道由节点有向图组成,如图 8-1 所示。每个节点每次连续接收一条记录,进行处理,然后将生成的记录转发到图中的下一个节点。此处理模型可以实现非常低的延迟,也就是说,输入记录可以由管道处理,并且可以在毫秒内生成结果输出。但是,该模型在从节点故障和散乱的节点(即比其他节点慢的节点)中恢复时效率不是很高。它可以使用大量额外的故障转移资源从故障中快速恢复,或者使用最少的额外资源,但是恢复速度相当缓慢。

 

微批流处理的出现

当 Apache Spark 引入 Spark Streaming(也称为 DStreams)时,这种传统方法受到了挑战。它引入了微批流处理的思想,其中流计算模型是基于一系列连续的微型 map/reduce 批处理作业实现的(因此称为“微批处理”)。这在示出的图 8-2。

如此处所示,Spark Streaming 将来自输入流的数据划分为大小为 1 秒的微批数据进行处理。每个批次在 Spark 集群中以分布式方式处理,形成一系列小的确定性任务,这些任务以微批次的形式生成输出。将流计算分解为诸多小任务,与传统的连续运算符模型相比,主要有以下两个优点:

通过在其他 Executor 上重新计划一个或多个任务副本,Spark 的敏捷任务调度可以非常快速,有效地从故障和 Executor 中恢复。

任务的确定性确保无论任务重新执行多少次,输出数据都是相同的。这一关键特性使 Spark Streaming 能够提供端到端的精确一次处理保证,即,生成的输出结果是由每个输入记录都能够被精确地处理一次而得到的。

这种有效的容错能力确实以时间延迟为代价的,微批模型无法达到毫秒级的时间延迟;通常可以达到几秒钟的延迟(在某些情况下,延迟仅为半秒)。但是,我们已经观察到,对于绝大多数流处理用例而言,微批处理的好处胜过二级延迟的缺点。这是因为大多数流传输管道至少具有以下特征之一:

管道不需要严格要求时间延迟低于几秒钟。例如,当按小时作业读取流进行输出时,生成具有亚秒级延迟的输出是没有什么意义的。

管道的其他部分存在较大的延迟。例如,如果对传感器写入 Apache Kafka(用于摄取数据流的系统)的数据进行批处理以实现更高的吞吐量,那么下游处理系统中的任何优化都无法使端到端延迟低于批处理延迟。

此外,DStream API 是在 Spark 的批处理 RDD API 的基础上构建的。因此,DStream 具有与 RDD 相同的功能语义和容错模型。因此,Spark Streaming 证明了一个统一的处理引擎可以为批处理,交互式和流式处理工作负载提供一致的 API 和语义。流处理中这种基本的范式转变促使 Spark Streaming 成为使用最广泛的开源流处理引擎之一。

 

从 Spark Streaming(DStreams)获得的经验教训

尽管 Spark Streaming 具备了很多的优点,但 DStream API 并非没有缺陷。以下是一些存在并且需要改进的关键领域:

缺少用于批处理和流处理的单一 API

即使 DStream 和 RDD 具有一致的 API(即相同的操作和相同的语义),开发人员在将批处理作业转换为流式作业时仍必须显式重写其代码以使用不同的类。

逻辑计划与物理计划之间缺乏隔离

Spark Streaming 按照开发人员指定的顺序执行 DStream 操作。由于开发人员可以有效地指定确切的物理计划,因此没有自动优化的空间,并且开发人员必须手动优化其代码才能获得最佳性能。

缺少对事件时间窗口的本地支持

DStream 仅根据 Spark Streaming 接收每个记录的时间(称为处理时间)来定义窗口操作。但是,许多用例需要根据生成记录的时间(称为事件时间)而不是接收或处理记录的时间来计算窗口聚合。缺少事件时间窗口的本机支持,使开发人员很难通过 Spark Streaming 构建此类管道。

以上这些缺点形成了结构化流(Structured Streaming)的设计理念,我们将在下面讨论。

 

结构化流(Structured Streaming)的设计哲学

基于 DStreams 的这些教训,结构化流从一开始就设计了一种核心理念——对于开发人员而言,编写流处理管道应该与编写批处理管道一样容易。简而言之,结构化流的指导原则是:

单一,统一的编程模型和接口,用于批处理和流处理

这个统一的模型为批处理和流式工作负载提供了一个简单的 API 接口。你可以像在批处理中一样在流上使用熟悉的 SQL 或类似批处理的 DataFrame 查询(就像你在上一章中学到的那样),从而将容错、优化和迟到数据的潜在复杂性留给引擎本身。在接下来的部分中,我们将研究你可能会编写的一些查询。

流处理的更广泛定义

大数据处理应用程序已经变得足够复杂,以至于实时处理和批处理之间的界线已大大模糊。结构化流的目的是将其适用性从传统的流处理扩展到更广泛的应用类别。从定时应用程序(例如,每隔几个小时)到连续(如传统流应用程序)处理数据的任何应用程序都应使用结构化流来处理数据。

接下来,我们将讨论结构化流使用的编程模型。

 

结构化流的编程模型

“表”是开发人员在构建批处理应用程序时所熟悉的一个众所周知的概念。通过将数据流视为一张无界的、连续追加的表,结构化流将该概念扩展到流应用程序,如图 8-3 所示。

数据流中接收到的每条新记录就像是追加到无界输入表中新的行。结构化流实际上并不会保留所有输入,但是结构化流产生的输出相当于将直到时间 T 的所有输入都存储在一个静态的、有界的表中,并基于该表上运行批处理作业。

如图 8-4 所示,开发人员接着在此概念性输入表上定义一个查询,就像它是静态表一样,以计算将要写入输出接收端的结果表。结构化流式处理会自动将这种类似批处理的查询转换为流式执行计划。这称为增量化:结构化流计算出每次记录到达时需要维护什么状态才能更新结果。最后,开发人员指定触发策略来控制何时更新结果。每次触发触发器时,结构化流都会检查新数据(即输入表中的新行)并增量更新结果。

模型的最后一部分是输出模式。每次更新结果表时,开发人员都希望将更新结果写入外部系统,例如文件系统(例如,HDFS,Amazon S3)或数据库(例如,MySQL,Cassandra)。我们通常希望以增量的方式进行输出。为此,结构化流提供了三种输出模式:

追加模式(Append mode)

当最后一个触发器触发后,只有新行才会追加到结果表继而写入外部存储。这仅适用于结果表中现有行无法更改的查询(例如,输入流上的 map)。

更新方式(Update mode)

顾名思义,自上次触发以来,只会在外部存储中更改结果表中更新的行。此模式适用于可以在适当位置更新的输出接收器,例如 MySQL 表。

全量模式(Complete mode)

整个更新的结果表将被写入外部存储。

 

除非指定全量模式,否则结构化流将不会完全实现结果表。仅维护足够的信息(称为“状态”)以确保可以计算结果表中的更改并可以输出更新。

 

将数据流看成表,不仅可以使概念化数据的逻辑计算变得更加容易,而且还可以使得在代码中表达它们更为容易。由于 Spark 的 DataFrame 是表的编程表示形式,因此你可以使用 DataFrame API 来表示对流数据的计算。你需要做的就是从流数据源定义一个输入 DataFrame(即输入表),然后像在批处理源上定义的 DataFrame 上应用相同的操作。

在下一节中,你将看到使用 DataFrames 编写结构化流查询是多么容易。

 

结构化流查询的基础

在本节中,我们将介绍开发结构化流查询时需要理解的一些高级概念。我们将首先介绍定义和启动流查询的关键步骤,然后讨论如何监控活动的查询并管理其生命周期。

定义流查询的五个步骤

如上一节所述,结构化流使用与批处理查询相同的 DataFrame API 来表示数据处理逻辑。但是,定义结构化流查询需要了解一些关键的区别。在本节中,我们将通过构建一个简单的查询来定义流查询的步骤,该查询通过读取 socket 上的文本数据流并计算字数。

 

步骤 1:定义输入源

与批处理查询一样,第一步是从流数据源定义 DataFrame。但是,在读取批处理数据源时,我们需要使用 spark.read 创建一个 DataFrameReader,而对于流数据源,我们需要通过 spark.readStream 创建一个 DataStreamReader。DataStreamReader 与 DataFrameReader 具有大部分相同的方法,因此你可以用类似的方式使用它。下面是一个通过 socket 连接接收文本数据流创建 DataFrame 的示例:

 In Python

spark = SparkSession...

lines = (spark

  .readStream.format("socket")

  .option("host", "localhost")

  .option("port", 9999)

  .load())

  

// In Scala

val spark = SparkSession...

val lines = spark

  .readStream.format("socket")

  .option("host", "localhost")

  .option("port", 9999)

  .load()

此代码从 localhost:9999 读取的以换行符分隔的文本数据生成 lines DataFrame 无界表。请注意,类似于使用 spark.read 的批处理源,它不会立即开始读取流数据。仅在显式启动流式查询后,它才设置读取数据所需的配置。

除了 socket 之外,Apache Spark 原生还支持从 Apache Kafka 和 DataFrameReader 支持的所有各种基于文件的格式(Parquet,ORC,JSON 等)读取数据流。本章后面介绍这些来源的详细信息及其支持的选项。此外,流查询可以定义多个输入源,包括流输入源和批输入源,可以使用 DataFrame 操作,如联合和连接(本章后面也讨论)。

 

步骤 2:转换数据

现在,我们可以应用一些常规的 DataFrame 操作,例如将行拆分为单个单词,然后对它们进行计数,如以下代码所示:

 In Python

from pyspark.sql.functions import *

words = lines.select(split(col("value"), "\\s").alias("word"))

counts = words.groupBy("word").count()

 

// In Scala

import org.apache.spark.sql.functions._

val words = lines.select(split(col("value"), "\\s").as("word"))

val counts = words.groupBy("word").count()

counts 是一个流式 DataFrame(即,无边界流数据上的一个 DataFrame),它表示一旦流查询开启则会对流输入数据进行连续处理,执行计数操作。

请注意,如果 lines 是批处理 DataFrame,则这些流式 DataFrame 的转换操作将以完全相同的方式工作。通常,大多数可以应用于批处理 DataFrame 的 DataFrame 操作也可以应用于流式 DataFrame。要了解结构化流支持哪些操作,你必须认识以下两类数据转换:

无状态转换

比如 select(),filter(),map()等操作不需要依赖以前的行来处理下一行的任何信息; 每一行都可以独立处理。这些操作缺少先前的“状态”,因此它们是“无状态”的。无状态操作可以同时应用于批处理和流式 DataFrame。

有状态的转换

相反,像 count()这样的聚合操作需要维护状态才能跨多行合并数据。更具体地说,任何涉及分组、连接或聚合的 DataFrame 操作都是有状态转换。虽然结构化流中支持许多这些操作,但不支持它们的一些组合,因为以增量方式计算它们在计算上很困难或不可行。

本章稍后将讨论结构化流支持的有状态操作以及如何管理 runtime 状态。

 

步骤 3:定义输出接收器和输出模式

转换数据后,我们可以通过定义 DataFrame.writeStream 决定(而不是 DataFrame.write,用于批处理数据)写入处理后的输出数据的模式。这会创建一个与 DataFrameWriter 类似的 DataStreamWriter,并具有其他方法来指定以下内容:

输出的详细信息(写入到输出的位置和方式)

处理细节(如何处理数据以及如何从故障中恢复)

让我们从输出的细节开始(我们将在下一步重点关注处理细节)。例如,下面的片段展示了如何将最终计数写入到控制台:

 In Python

writer = counts.writeStream.format("console").outputMode("complete")

 

// In Scala

val writer = counts.writeStream.format("console").outputMode("complete")

在这里,我们指定"console"作为输出流接收器并且指定了"complete"输出模式。流查询的输出模式指定在处理新的输入数据之后要写入更新后输出的哪一部分。在此示例中,随着大量新输入数据的处理和单词字数的更新,我们可以选择将到目前为止看到的所有单词(即 complete 模式)的计数打印到控制台,也可以仅将最后一个输入数据块中更新的内容打印到控制台中。这是由指定的输出模式决定的,可以是以下输出模式之一(正如我们在“结构化流的编程模型”中已经看到的那样:

追加模式(Append mode)

默认是追加模式,在该模式下,仅将自上次触发以来添加到结果表或者 DataFrame(例如,表)中的新行输出到接收器。从语义上讲,此模式可以保证将来查询不会更改或更新输出的任何行。因此,只有那些永远不会修改以前的输出数据的查询(例如,无状态查询)才支持追加模式。相反,我们的计数查询需要更新以前生成的字数;因此,它不支持追加模式。

全量模式(Complete mode)

在这种模式下,结果表或者 DataFrame 的所有行将在每个触发器的末尾输出。在结果表可能比输入数据小得多的查询中可以支持此操作,因此可以将其保留在内存中。例如,我们的计数查询支持全量模式,因为计数数据可能比输入数据小得多。

更新模式(Update mode)

在此模式下,将仅在每个触发器的末尾输出自上一个触发器以来已更新的结果表或者 DataFrame 的行。这与追加模式相反,因为输出行可能会被查询然后修改并在将来再次输出。大多数查询都支持更新模式。

 

在最新的《结构化流编程指南》中可以找到有关不同查询支持的输出模式的完整详细信息。

 

除了将输出写入控制台外,结构化流还原生支持对文件和 Apache Kafka 的流写入。此外,你可以使用 foreachBatch()和 foreach()API 方法写入任意位置。实际上,你可以使用 foreachBatch()方法基于现有的批处理数据源来写入流输出(但是无法保证精确一次性)。这些接收器及其支持的选项的详细信息将在本章后面讨论。

 

步骤 4:指定处理详细信息

开始查询之前的最后一步是指定如何处理数据的详细信息。继续我们的计数示例,我们将指定处理细节,如下所示:

In Python

checkpointDir = "..."

writer2 = (writer

  .trigger(processingTime="1 second")

  .option("checkpointLocation", checkpointDir))

  

// In Scala

import org.apache.spark.sql.streaming._

val checkpointDir = "..."

val writer2 = writer

  .trigger(Trigger.ProcessingTime("1 second"))

  .option("checkpointLocation", checkpointDir)

在这里,我们使用 DataFrame.writeStream 创建 DataStreamWriter 指定了两种类型的详细信息:

触发细节

这指示何时触发发现和处理新的可用的流数据。有四个选项:

默认

如果未显式指定触发器,则默认情况下,流查询将以微批处理执行数据计算,一旦前一个微批处理完成,就会触发下一个微批处理。

带触发间隔的处理时间

你可以使用 ProcessingTime 触发器指定时间间隔,查询将以该固定间隔触发微批。

只触发一次

在这种模式下,流查询将仅执行一个微批处理——它会在单个批处理中处理所有可用的新数据,然后自行停止。当你想要通过外部调度程序来控制触发和处理时,该调度程序将使用任何自定义调度来重新启动查询(例如,通过每天仅执行一次查询来控制成本),该功能非常有用。

连续触发

这是一种实验性模式(从 Spark 3.0 开始),在这种模式下,流查询将连续处理数据而不是微批处理。尽管只有一小部分的 DataFrame 操作允许使用此模式,但它可以提供比微批触发模式低得多的延迟(低至毫秒)。有关最新信息,请参阅最新的《结构化流编程指南》。

检查点位置触发

这是任何与 HDFS 兼容的文件系统中的目录,流式查询可以保存其进度信息,即已成功处理了哪些数据。失败时,此元数据将用于从失败查询的结束位置重新启动失败的查询。因此,设置此选项对于具有精确一次保证的故障恢复是必要的。

 

步骤 5:开始查询

指定所有内容后,最后一步是启动查询,你可以执行以下操作:

In Python

streamingQuery = writer2.start()

 

// In Scala

val streamingQuery = writer2.start()

返回 streamingQuery 类型的对象表示活跃的(active)查询,可用于管理查询,我们将在本章后面介绍。

请注意,start()是一种非阻塞方法,因此它将在后台启动查询后立即返回。如果你希望主线程在流查询终止之前一直阻塞,则可以使用 streamingQuery.awaitTermination()。如果查询在后台失败并显示错误,则 awaitTermination()也会因相同的异常而失败。

你可以使用 awaitTermination(timeoutMillis)来设置等待超时时间,并且可以使用 streamingQuery.stop()方法来停止查询。

 

将以上步骤组合起来使用

总结一下,这是完整的代码,用于通过 socket 读取文本数据流,对单词进行计数并将计数输出到控制台:

In Python

from pyspark.sql.functions import *

spark = SparkSession...

lines = (spark

  .readStream.format("socket")

  .option("host", "localhost")

  .option("port", 9999)

  .load())

 

words = lines.select(split(col("value"), "\\s").alias("word"))

counts = words.groupBy("word").count()

checkpointDir = "..."

streamingQuery = (counts

  .writeStream

  .format("console")

  .outputMode("complete")

  .trigger(processingTime="1 second")

  .option("checkpointLocation", checkpointDir)

  .start())

streamingQuery.awaitTermination()

 

// In Scala

import org.apache.spark.sql.functions._

import org.apache.spark.sql.streaming._

val spark = SparkSession...

val lines = spark

  .readStream.format("socket")

  .option("host", "localhost")

  .option("port", 9999)

  .load()

 

val words = lines.select(split(col("value"), "\\s").as("word"))

val counts = words.groupBy("word").count()

 

val checkpointDir = "..."

val streamingQuery = counts.writeStream

  .format("console")

  .outputMode("complete")

  .trigger(Trigger.ProcessingTime("1 second"))

  .option("checkpointLocation", checkpointDir)

  .start()

streamingQuery.awaitTermination()

查询开始后,后台线程不断从流数据源中读取新数据,对其进行处理,然后将其写入流接收器。接下来,让我们快速了解一下如何执行此操作。

活跃(Active)流查询的底层原理

查询开始后,引擎中将发生以下步骤序列,如图 8-5 所示。DataFrame 操作将转换为逻辑计划,这是 Spark SQL 用于查询计划计算的抽象表示:

1. Spark SQL 分析并优化此逻辑计划,以确保可以在流数据上增量高效地执行该逻辑计划。

2. Spark SQL 启动一个后台线程,该线程连续执行以下循环操作:

a. 基于配置的触发间隔,线程检查流数据源是否有新数据。

b. 如果可用,则通过运行微批处理来执行新数据。根据优化的逻辑计划,生成优化的 Spark 执行计划,该计划从源读取新数据,增量计算更新的结果,然后根据配置的输出模式将输出写入接收器。

c. 对于每个微批处理,已处理数据的精确范围(例如,文件集或 Apache Kafka 偏移量)和任何关联状态都保存在已配置的检查点位置中,以便查询可以在需要的时候确定地进行重放,重新处理所需的确切范围内的数据。

3. 该循环一直持续到查询终止为止,另一方面,以下几种原因也可以造成该查询终止:

a. 查询中发生了故障(处理错误或集群中的故障)。

b. 使用 streamingQuery.stop()显式停止查询。

c. 如果将触发器设置为 Once,则在执行包含所有可用数据的单个微批处理后,查询将自行停止。

关于结构化流,你应该记住的一个关键点是,在它的下面是使用 Spark SQL 来执行数据。因此,利用 SparkSQL 的超优化执行引擎来最大限度地提高流处理吞吐量,提供了关键的性能优势。

接下来,我们将讨论终止后如何重新启动流查询以及流查询的生命周期。

 

精确一次性从故障中恢复

要以全新的过程重新启动终止的查询,你必须创建一个新的 SparkSession,重新定义所有 DataFrame,然后使用与第一次启动查询时所使用的检查点相同的检查点位置对最终结果启动流式查询。对于我们的字数统计示例,你可以简单地重新执行前面显示的整个代码段,从 spark 第一行中的定义到最后一行中的 start()。

重新启动之间的检查点位置必须相同,因为此目录包含流查询的唯一标识并确定查询的生命周期。如果删除了检查点目录,或者使用不同的检查点目录启动了相同的查询,则就像从头开始新的查询一样。具体来说,检查点具有记录级别的信息(例如 Apache Kafka 偏移量)以跟踪上次未完成的微批处理所处理的数据范围。重新启动的查询将使用此信息来精确地在最后一次成功完成微批处理之后开始处理记录。如果先前的查询计划在完成微批处理之前终止,则重新启动的查询将在处理新数据之前重新处理相同范围的数据。结合 Spark 的确定性任务执行,重新生成的输出将与重新启动前的预期输出相同。

当满足以下条件时,结构化流可以确保端到端的精确一次保证(即,输出就像每个输入记录被精确地处理了一次一样):

可重放的流数据源

最后一个不完整的微批处理的数据范围可以从源代码中重读取。

确定性计算

给定相同的输入数据时,所有的数据转换都会确定性地产生相同的结果。

幂等性流接收器

接收器可以识别重新执行的微批处理,并忽略可能由重启引起的重复写入。

请注意,由于 socket 源不可重播且控制台接收器不是幂等的,因此我们的字数统计示例不提供一次准确的保证。

作为关于重新启动查询的最后注意事项,可以在重新启动之间对查询进行较小的修改。你可以通过以下几种方式修改查询:

DataFrame 转换

你可以对重新启动之间的转换进行较小的修改。例如,在我们的流字数示例中,如果你要忽略字节序列损坏的行,这些行可能使查询崩溃,则可以在转换中添加过滤器:

In Python

 isCorruptedUdf = udf to detect corruption in string

 

filteredLines = lines.filter("isCorruptedUdf(value) = false")

words = filteredLines.select(split(col("value"), "\\s").alias("word"))

 

// In Scala

// val isCorruptedUdf = udf to detect corruption in string

 

val filteredLines = lines.filter("isCorruptedUdf(value) = false")

val words = filteredLines.select(split(col("value"), "\\s").as("word"))

使用此修改后的 words DataFrame 重新启动后,重新启动的查询将对自重新启动以来处理的所有数据(包括最后一个不完整的微批处理)应用过滤器,以防止查询再次失败。

源和接收器选项

是否可以在重新启动之间更改 readStream 或 writeStreamoption,取决于特定源或接收器的语义。例如,如果要将数据发送到该主机和端口,则不应更改 socket 源的 host 和 port 选项。但是你可以在控制台接收器中添加一个选项,以便在每个触发器触发之后打印多达 100 个更改的计数:

writeStream.format("console").option("numRows", "100")...

处理细节

如前所述,在重新启动之间不得更改检查点位置。但是,可以在不破坏容错保证的情况下更改触发间隔等其他详细信息。

有关重启之间允许微调的更多信息,请参见最新的《结构化流编程指南》。

 

监控活跃查询

在生产中运行流传输管道的重要部分是跟踪其运行状况。结构化流提供了几种跟踪活动查询的状态和处理指标的方式。

使用 StreamingQuery 查询当前状态

你可以使用 StreamingQuery 实例来查询活动查询的当前运行状况。有以下两种方法:

 

使用 StreamingQuery 获取当前指标

当查询微批处理中的某些处理数据时,我们认为它已经取得了一些进展。lastProgress()方法返回有关最后完成的微批处理的信息。例如,打印返回的对象(在 Scala / Java 中或在 Python 中 StreamingQueryProgress 的字典中)将产生以下内容:

// In Scala/Python

{

 "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",

 "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",

 "name" : "MyQuery",

 "timestamp" : "2016-12-14T18:45:24.873Z",

 "numInputRows" : 10,

 "inputRowsPerSecond" : 120.0,

 "processedRowsPerSecond" : 200.0,

 "durationMs" : {

 "triggerExecution" : 3,

 "getOffset" : 2

 },

 "stateOperators" : [ ],

 "sources" : [ {

 "description" : "KafkaSource[Subscribe[topic-0]]",

 "startOffset" : {

 "topic-0" : {

 "2" : 0,

 "1" : 1,

 "0" : 1

 }

 },

 "endOffset" : {

 "topic-0" : {

 "2" : 0,

 "1" : 134,

 "0" : 534

 }

 },

 "numInputRows" : 10,

 "inputRowsPerSecond" : 120.0,

 "processedRowsPerSecond" : 200.0

 } ],

 "sink" : {

 "description" : "MemorySink"

 }

}

一些值得注意的列是:

id

绑定到检查点位置的唯一标识符。在查询的整个生命周期内(即在重新启动之间),保持不变。

runId

当前(重新)启动的查询实例的唯一标识符。每次重新启动都会改变。

numInputRows

在上一个微批处理中处理的输入行数。

inputRowsPerSecond

在源端生成输入行的当前速率(最后一个微批持续时间的平均值)。

processedRowsPerSecond

接收器正在处理和写入行的当前速率(最后一个微批处理持续时间的平均值)。如果此速率始终低于输入速率,则查询将无法像源生成数据一样快地处理数据。这是查询运行状况的关键指标。

sources 和 sink

提供上一批中处理的数据的特定于源和接收器的详细信息。

使用 StreamingQuery.status() 获取当前状态

这提供了有关后台查询线程当前正在执行的操作的信息。例如,打印返回的对象将产生如下内容:

// In Scala/Python

{

 "message" : "Waiting for data to arrive",

 "isDataAvailable" : false,

 "isTriggerActive" : false

}

使用 Dropwizard 度量发布指标

Spark 通过一个名为 Dropwizard Metrics 的通用库支持报告指标。该库允许将度量标准发布到许多流行的监控框架(Ganglia,Graphite 等)。默认情况下,结构化流查询不启用这些指标,因为报告的数据量很大。要启用它们,除了为 Spark 配置 Dropwizard Metrics 外,你必须在开始查询之前显式设置 SparkSession 的配置 spark.sql.streaming.metricsEnabled 为 true。

请注意,只有通过 StreamingQuery.lastProgress() 可获得 Dropwizard Metrics 发布的信息的一部分。如果要连续将更多进度信息发布到任意位置,则必须编写自定义监听器,如下所述。

使用自定义 StreamingQueryListeners 发布指标

StreamingQueryListener 是事件监听器接口,你可以使用它注入任意逻辑以连续发布指标。该 API 只支持开发人员在 Scala 或 Java 中使用。使用自定义监听器有两个步骤:

1.定义你的自定义监听器。该 StreamingQueryListener 接口提供了三种可以实现的方法,以获取与流查询相关的 3 种类型的事件:开始、进度(即,执行了触发器)和终止。这是一个例子:

// In Scala

import org.apache.spark.sql.streaming._

val myListener = new StreamingQueryListener() {

  override def onQueryStarted(event: QueryStartedEvent): Unit = {

    println("Query started: " + event.id)

  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {

    println("Query terminated: " + event.id)

  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {

    println("Query made progress: " + event.progress)

  }

}

2.在开始查询之前,将你的监听器添加到 SparkSession 中:

// In Scala

spark.streams.addListener(myListener)

添加监听器后,在 SparkSession 上运行的流查询的所有事件将开始调用监听器的方法。

 

我们在下一篇文章中会讲解流数据源和接收器、数据转换、有状态的流聚合等知识点。

发布于: 15 小时前阅读数: 17
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
结构化流-Structured Streaming(八-上)