写点什么

结构化流 -Structured Streaming(八 - 下)

发布于: 40 分钟前
结构化流-Structured Streaming(八-下)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


流关联

结构化流支持将流数据集与另一个静态或流数据集关联在一起。在本节中,我们将探讨支持哪些类型的连接(内部,外部等),以及如何使用水位线限制有状态连接的存储状态。我们将从连接数据流和静态数据集的简单案例开始。

流–静态数据集连接

许多用例需要将数据流与静态数据集结合在一起。例如,让我们考虑一下广告获利的情况。假设你是一家在网站上展示广告的广告公司,并且当用户点击广告时你就赚钱了。假设你有一个要显示的所有广告的静态数据集(称为展示次数),以及用户每次点击所显示的广告时的另一个事件流。要计算点击收入,你必须将事件流中的每次点击与表中相应的广告展示次数进行匹配。首先,将数据表示为两个 DataFrame,一个为静态,一个为流式,如下所示:

  In Python

  Static DataFrame [adId: String, impressionTime: Timestamp, ...]

  reading from your static data source

impressionsStatic = spark.read. ...

 

  Streaming DataFrame [adId: String, clickTime: Timestamp, ...]

  reading from your streaming source

clicksStream = spark.readStream. ...

 

// In Scala

// Static DataFrame [adId: String, impressionTime: Timestamp, ...]

// reading from your static data source

val impressionsStatic = spark.read. ...

 

// Streaming DataFrame [adId: String, clickTime: Timestamp, ...]

// reading from your streaming source

val clicksStream = spark.readStream. ...

要将点击次数与展示次数进行匹配,你可以使用公共的 adId 列在它们之间简单地应用内部等值连接:

In Python

matched = clicksStream.join(impressionsStatic, "adId")

 

// In Scala

val matched = clicksStream.join(impressionsStatic, "adId")

如果广告和点击都是静态 DataFrame,则此代码与你编写的代码相同-唯一的区别是你使用 spark.read()用于批处理,对流使用 spark.readStream()。执行此代码时,每个微批点击次数都将与静态广告表进行内部连接,以生成匹配事件的输出流。

除了内部连接之外,结构化流还支持两种类型的流–静态外部连接:

 当左侧是流式 DataFrame 时,左外部连接

 当右侧是流式 DataFrame 时,进行右外部连接

不支持其他类型的外部连接(例如,全外连接和右侧是流式 DataFrame 的左外连接),因为它们不容易以增量方式运行。在两种受支持的情况下,代码都与两个静态 DataFrame 之间的左或右外部连接的代码完全相同:

In Python

matched = clicksStream.join(impressionsStatic, "adId", "leftOuter")

 

// In Scala

val matched = clicksStream.join(impressionsStatic, Seq("adId"), "leftOuter")

关于流静态连接,有几点要注意:

流静态连接是无状态操作,因此不需要任何类型的水位线。

在与每个微批处理的流数据结合在一起时,将重复读取静态 DataFrame,因此你可以缓存静态 DataFrame 以加快读取速度。

如果在其上定义了静态 DataFrame 的数据源中的底层数据发生了更改,那么流查询是否查看这些变化取决于数据源的特定行为。例如,如果静态 DataFrame 是在文件上定义的,则在重新启动流查询之前,不会选择对这些文件的更改(例如,追加)。

在此流静态示例中,我们做了一个重要假设:展示表是静态表。实际上,随着新广告的曝光,将会产生新的展示流。虽然流静态连接非常适合通过追加的静态(或缓慢变化)信息来丰富一个流中的数据,但是当两种数据源都在快速变化时,这种方法是不够的。为此,你需要流-流连接,我们将在下面讨论。


流–流连接

在两个数据流之间生成连接的挑战在于,在任何时间点,任一数据集的视图都不完整,这使得在输入之间查找匹配项变得更加困难。来自两个流的匹配事件可以按任何顺序到达并且可以被任意延迟。例如,在我们的广告用例中,广告事件及其相应的点击事件可能会无序到达,它们之间会有任意延迟。结构化流处理通过将来自两侧的输入数据作为流处理状态进行缓冲,并在接收到新数据时连续检查是否匹配,从而解决了此类延迟问题。概念如图 8-11 所示。

让我们更详细地考虑这一点,首先是内部连接,然后是外部连接。

内连接+可选的水位线

假设我们已经将 impressions DataFrame 重新定义为流式 DataFrame。为了获得匹配的展示及其相应点击的流,我们可以使用之前用于静态连接和流静态连接的相同代码:

In Python

  Streaming DataFrame [adId: String, impressionTime: Timestamp, ...]

impressions = spark.readStream. ...

 

  Streaming DataFrame[adId: String, clickTime: Timestamp, ...]

clicks = spark.readStream. ...

matched = impressions.join(clicks, "adId")

 

// In Scala

// Streaming DataFrame [adId: String, impressionTime: Timestamp, ...]

val impressions = spark.readStream. ...

 

// Streaming DataFrame[adId: String, clickTime: Timestamp, ...]

val clicks = spark.readStream. ...

val matched = impressions.join(clicks, "adId")

即使代码相同,执行方式也完全不同。当执行此查询时,处理引擎将识别出它是流-流连接,而不是流-静态数据集连接。引擎会将所有点击和展示作为状态进行缓冲,并在收到的点击与缓冲的展示匹配后立即生成匹配的展示和点击(反之亦然,取决于首先收到的是哪个)。让我们使用图 8-12 中的示例事件时间线来直观地了解此内部连接的工作方式。

在图 8-12 中,蓝点表示在不同微批之间接收到的展示和点击事件的事件时间(用虚线虚线分隔)。为了便于说明,假设每个事件实际上都是在与事件时间相同的时间接收到的。注意连接相关事件的不同情况。带有 adId=⧮标记的两个事件都在同一微批中接收,因此它们的合并输出是由该微批生成的。但是,对于 adId= ⧉的事件是在 12:04 收到的,比其在 12:13 相应的点击事件要早得多。结构化流式传输将首先在 12:04 接收数据并将其缓冲在状态中。对于每个收到的点击事件,引擎将尝试将其与所有缓冲的展示事件合并(反之亦然)。最终,在随后 12:13 左右运行的微批处理中,引擎收到 adId = ⧉的点击并生成合并的输出。

但是,在此查询中,我们没有提供任何指示引擎应将事件缓冲多长时间来找到匹配项。因此,引擎可以一直缓冲事件,并累积无限制的流状态。为了限制由流-流连接维护的流状态,你需要了解有关用例的以下信息:

在两个事件在其各自来源处生成之间的最大时间范围是多少?在我们的用例上下文中,我们假设在相应的展示之后,你可以在零秒到一小时内发生点击。

 事件可以在源和处理引擎之间传递的最大持续时间是多少?例如,来自浏览器的广告点击可能会由于间歇性连接而延迟,并且到达的时间比预期的要晚得多,而且顺序混乱。假设展示次数和点击次数最多可以分别延迟两个小时和三个小时。

可以使用水位线和时间范围条件在 DataFrame 操作中对这些延迟限制和事件时间约束进行编码。换句话说,你必须在连接中执行以下的附加步骤以确保状态清理:

1. 在两个输入上定义水位线延迟,以便引擎知道输入的延迟程度(类似于流聚合)。

2. 定义两个输入之间的事件时间约束,以便引擎可以确定何时不需要与另一个输入的旧行匹配(即,将不满足时间约束)。可以通过以下方式之一定义此约束:

a. 时间范围加入条件(例如加入条件="leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR" )

b. 在事件时间窗口上加入(例如,加入条件= "leftTimeWindow = rightTimeWindow")

 

在我们的广告用例中,我们的内连接代码将变得更加复杂:

In Python

Define watermarks

impressionsWithWatermark = (impressions

  .selectExpr("adId AS impressionAdId", "impressionTime")

  .withWatermark("impressionTime", "2 hours"))

 

clicksWithWatermark = (clicks

  .selectExpr("adId AS clickAdId", "clickTime")

  .withWatermark("clickTime", "3 hours"))

Inner join with time range conditions

(impressionsWithWatermark.join(clicksWithWatermark,

  expr("""

    clickAdId = impressionAdId AND

    clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour""")))

    

// In Scala

// Define watermarks

val impressionsWithWatermark = impressions

  .selectExpr("adId AS impressionAdId", "impressionTime")

  .withWatermark("impressionTime", "2 hours ")

 

val clicksWithWatermark = clicks

  .selectExpr("adId AS clickAdId", "clickTime")

  .withWatermark("clickTime", "3 hours")

 

// Inner join with time range conditions

impressionsWithWatermark.join(clicksWithWatermark,

  expr("""

    clickAdId = impressionAdId AND

    clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""))

在每个事件具有这些时间限制的情况下,处理引擎可以自动计算需要缓冲事件多长时间才能生成正确的结果,以及何时可以从状态中删除事件。例如,它将评估以下内容(如图 8-13 所示):

展示次数最多需要缓冲四个小时(以事件时间为准),因为三小时后的点击可能会与四个小时前的展示次数相匹配(即,迟展示次数和点击之间迟到 3 小时至 4 小时)。

相反,点击需要最多缓冲两个小时(以事件时间为准),因为两小时后的展示可能与两个小时前收到的点击相匹配。

关于内部连接,要记住一些关键点:

对于内部连接,指定水位线和事件时间约束都是可选的。换句话说,如果有可能出现无限状态的风险,你可以选择不指定它们。只有同时指定了两者,你才能进行状态清理。

与加上水位线的聚合上所提供的保证类似,2 小时的水位线延迟可确保引擎永远不会丢弃或不匹配延迟少于 2 小时的任何数据,但是延迟超过 2 小时的数据可能会或可能不会得到处理。

外连接+水位线

先前的内部连接将仅输出已接收到两个事件的那些广告。换句话说,没有点击的广告将根本不会被报告。取而代之的是,你可能希望报告所有广告展示(带有或不带有关联的点击数据),以便以后进行其他分析(例如,点击率)。这时我们可以进行流式之间的外连接。要实现此目的,你所需要做的就是指定外部连接类型:

In Python

Left outer join with time range conditions

(impressionsWithWatermark.join(clicksWithWatermark,

  expr("""

    clickAdId = impressionAdId AND

    clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""),

  "leftOuter"))  # only change: set the outer join type

  

// In Scala

// Left outer join with time range conditions

impressionsWithWatermark.join(clicksWithWatermark,

  expr("""

    clickAdId = impressionAdId AND

    clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""),

  "leftOuter")  // Only change: set the outer join type

正如预期的外部连接一样,此查询将开始为每次展示生成输出,无论是否包含点击数据(即使用 NULL)。但是,还有一些关于外部连接的注意事项:

与内部连接不同,水位线延迟和事件时间约束对于外部连接不是可选的。这是因为为了生成 NULL 结果,引擎必须知道未来事件何时与其他事件不匹配。要获得正确的外部连接结果和状态清理,必须指定水印和事件时间约束。

 因此,外部 NULL 结果将产生延迟,因为引擎必须等待一段时间才能确保既没有存在,也不会没有任何匹配。如上一节所述,此延迟是引擎为每个事件计算的最大缓冲时间(相对于事件时间)(即,展示数为 4 个小时,点击数为 2 个小时)。

 

任意的有状态计算

与我们到目前为止讨论的 SQL 操作相比,许多用例需要更复杂的逻辑。例如,假设你要通过实时跟踪用户的活动(例如,点击)来跟踪用户的状态(例如,登录,忙碌,空闲)。要构建此流处理管道,你将必须使用任意数据结构将每个用户的活动历史记录跟踪为一种状态,并根据用户的操作对数据结构连续应用任意复杂的更改。mapGroupsWithState()操作及更灵活的 flatMapGroupsWithState()操作是针对此类复杂的分析用例而设计的。

从 Spark 3.0 开始,这两个操作仅在 Scala 和 Java 中可用。

在本节中,我们将以一个简单的 mapGroupsWithState()示例开始,以说明对自定义状态数据建模和对其进行定制化操作的四个关键步骤。然后,我们将讨论超时的概念以及如何使用它们来使一段时间没有更新的状态失效。我们将以 flatMapGroupsWithState()结尾,这为你提供了更大的灵活性。

使用 mapGroupsWithState()对任意有状态操作建模

具有任意数据结构和状态上任意转换的状态被建模为用户定义的函数,该函数将先前版本的状态值和新数据作为输入,并生成更新的状态和计算结果作为输出。在 Scala 语法中,你必须定义具有以下签名的函数(简要解释一下,K,V,S 和 U 是数据类型):

// In Scala

def arbitraryStateUpdateFunction(

    key: K,

    newDataForKey: Iterator[V],

    previousStateForKey: GroupState[S]

): U

使用 groupByKey()和将 mapGroupsWithState()操作提供给流查询此功能,如下所示:

// In Scala

val inputDataset: Dataset[V] =  // input streaming Dataset

 

inputDataset

  .groupByKey(keyFunction)   // keyFunction() generates key from input

  .mapGroupsWithState(arbitraryStateUpdateFunction)

启动此流查询时,在每个微批处理中,Spark 都会为微批处理数据中的每个唯一键调用此方法 arbitraryStateUpdateFunction()。让我们具体了解一下参数是什么,Spark 将使用以下的参数调用该函数:

key: K

K 是在状态和输入中定义的公共键的数据类型。Spark 将为数据中的每个唯一键调用此函数。

newDataForKey: Iterator[V]

V 是输入数据集的数据类型。当 Spark 为某个键调用此函数时,此参数将拥有与该键对应的所有新输入数据。请注意,未定义输入数据对象在迭代器中的顺序。

previousStateForKey: GroupState[S]

S 是要维护的任意状态的数据类型,并且 GroupState[S]是类型化的包装对象,提供了访问和管理状态值的方法。当 Spark 为某个键调用此函数时,此对象将提供上一次 Spark 为该键调用该函数设置的状态值(即,针对先前的一个微批次)。

U

U 是函数输出的数据类型。

你还必须提供几个额外的参数。所有类型(K,V,S,U)必须可由 Spark SQL 的编码器进行编码。因此,在 mapGroupsWithState()中,你必须在 Scala 隐式中或在 Java 中显式提供编码器。请参见第 6 章“数据集编码器”的更多细节。

 

让我们通过一个示例来研究如何以这种格式表示所需的状态更新功能。假设我们要根据用户的行为信息来了解他们的行为。从概念上讲,这很简单:在每个微型批处理中,对于每个活跃用户,我们将使用用户执行的新操作并更新用户的“状态”。从语法上来讲,我们可以通过以下步骤定义状态更新功能:

1. 定义数据类型。我们需要定义 K,V,S 和 U 的确切的类型。在这种情况下,我们将使用以下方法:

a. 输入 data(V)= case class UserAction(userId: String, action: String)

b. Key(K)= String(即 userId)

c. State(S)= case class UserStatus(userId: String, active: Boolean)

d.  Output(U)= UserStatus,因为我们要输出最新的用户状态

请注意,编码器支持所有这些数据类型。


2.定义功能。根据所选的类型,让我们将逻辑概念转换为代码。当使用新的用户操作调用此函数时,我们需要处理两种主要情况:该键(例如,userId)是否存在先前的状态(即,先前的用户状态)。因此,我们将初始化用户的状态,或使用新操作更新现有状态。我们将使用新的运行计数显式更新状态,最后返回已更新的 userId-userStatus 键值对:

// In Scala

import org.apache.spark.sql.streaming._

 

 def updateUserStatus(

    userId: String,

    newActions: Iterator[UserAction],

    state: GroupState[UserStatus]): UserStatus = {

 

  val userStatus = state.getOption.getOrElse {

    new UserStatus(userId, false)

  }

  newActions.foreach { action =>

    userStatus.updateWith(action)

  }

  state.update(userStatus)

  return userStatus

}

在这些操作上应用该函数。我们将使用 groupByKey()来对输入的操作数据集进行分组,然后使用 mapGroupsWithState()来应用 updateUserStatus 函数:

// In Scala

val userActions: Dataset[UserAction] = ...

val latestStatuses = userActions

  .groupByKey(userAction => userAction.userId)

  .mapGroupsWithState(updateUserStatus _)

一旦使用控制台输出启动此流查询,我们将看到更新的用户状态。

在我们转入更高级的主题之前,有几点需要记住:

调用函数时,新数据迭代器(例如 newActions)中的输入记录没有明确定义好顺序。如果你需要以特定顺序(例如,按照执行操作的顺序)使用输入记录更新状态,则必须显式地对它们进行重新排序(例如,基于事件时间戳或某些其他排序 ID)。实际上,如果从数据源读取时存在无序性,则你必须考虑未来的微批处理可能接收到当前批处理数据之前应该处理的数据。在这种情况下,你必须将记录作为状态的一部分进行缓冲。

在微批处理中,仅当微批处理具有该键的数据时,才对键调用一次该函数。例如,如果用户长时间不活动并且不提供任何新操作,则默认情况下,长时间不调用该函数。如果要根据用户长时间不活跃来更新或删除状态,则必须使用超时机制,我们将在下一节中讨论超时。

增量处理引擎的 mapGroupsWithState()输出被假定为不断更新的键/值记录,类似于聚合的输出。这就限制 mapGroupsWithState()之后查询中支持的操作和支持的接收器。例如,不支持将输出追加到文件中。如果要以更大的灵活性应用任意有状态操作,则必须使用 flatMapGroupsWithState()。我们将在“超时”后进行讨论。

使用超时机制来管理非活跃组

在前面的跟踪活跃用户会话的示例中,随着更多用户变为活跃状态,该状态中的键数量将不断增加,该状态所使用的内存也将不断增加。现在,在实际情况下,用户可能不会一直保持活跃状态。将不活跃用户的状态保持在该状态可能不是很有用,因为在这些用户再次变为活跃状态之前,它不会再次更改。因此,我们可能希望显式删除不活跃用户的所有信息。但是,用户可能未显示地采取任何行动以使其变为非活跃状态(例如,显示注销),并且我们可能必须将非活跃定义为在阈值区间内缺少任何操作。这使得在函数中编码变得很棘手,因为直到有来自该用户的新操作,该函数才被调用。

要对基于时间的不活动进行编码,mapGroupsWithState()支持以下定义的超时:

每次在按键上调用该功能时,都可以基于持续时间或阈值时间戳对键设置超时。

如果该键未接收到任何数据,从而满足了超时条件,则该键将标记为“超时”。下一个微批处理将在此超时键上调用该函数,即使该微批处理中没有该键的数据也是如此。在此特殊函数调用中,新的输入数据迭代器将为空(因为没有新数据),并且 GroupState.hasTimedOut()将返回 true。这是在函数内部识别是由于新数据还是超时引起函数调用的最佳方法。

基于我们的两种时间概念,有两种类型的超时时间:处理时间和事件时间。处理时间超时是两者中使用起来比较简单的时间,因此我们将从它开始。

处理时间超时

处理时间超时基于运行流查询的计算机的系统时间(也称为挂钟时间),其定义如下:如果 key 最后在系统时间戳接收数据 T,并且当前时间戳大于(T + <timeout duration>),那么将使用新的空数据迭代器再次调用该函数。

让我们研究如何通过更新我们的用户示例来删除用户的一小时不活跃状态来使用超时。我们将做出三处更改::

在 mapGroupsWithState()中,我们将超时指定为 GroupStateTimeout.ProcessingTimeTimeout。

在状态更新函数中,在使用新数据更新状态之前,我们必须检查状态是否超时。因此,我们将更新或删除状态。

此外,每次我们使用新数据更新状态时,我们都会设置超时时间。

这是更新的代码:

// In Scala

def updateUserStatus(

    userId: String,

    newActions: Iterator[UserAction],

    state: GroupState[UserStatus]): UserStatus = {

 

  if (!state.hasTimedOut) {       // Was not called due to timeout

    val userStatus = state.getOption.getOrElse {

      new UserStatus(userId, false)

    }

    newActions.foreach { action => userStatus.updateWith(action) }

    state.update(userStatus)

    state.setTimeoutDuration("1 hour") // Set timeout duration

    return userStatus

    

  } else {

    val userStatus = state.get()

    state.remove()                  // Remove state when timed out

    return userStatus.asInactive()  // Return inactive user's status

  }

}

 

val latestStatuses = userActions

  .groupByKey(userAction => userAction.userId)

  .mapGroupsWithState(

    GroupStateTimeout.ProcessingTimeTimeout)(

    updateUserStatus _)

此查询将自动清理已超过一个小时未处理任何相关数据的用户的状态。但是,关于超时,有几点要注意:

对于新接收的数据或超时,当再次调用该函数时,由上一次对该函数的调用所设置的超时将自动取消。因此,无论何时调用该函数,都需要显式设置超时时间或时间戳以启用超时。

由于超时是在微批处理期间处理的,因此其执行时间不精确,并且在很大程度上取决于触发间隔和微批处理时间。因此,建议不要使用超时来进行精确的时序控制。

尽管处理时间超时的原因很简单,但它们对降低速率和停机时间的作用不强。如果流查询的停机时间超过一小时,那么在重新启动后,该状态下的所有键都将超时,因为自从每个键接收数据以来已经过去了一个多小时。如果查询处理数据的速度比到达数据源的速度慢(例如,如果数据到达并在 Kafka 中进行缓冲),则可能会发生类似的大规模超时。例如,如果超时为五分钟,则导致五分钟的延迟的处理速率的突然下降(或数据到达速率的峰值)可能会导致假超时。为避免此类问题,我们可以使用事件超时时间,我们将在下面讨论。

事件时间超时

事件时间超时不是基于系统时钟时间,而是基于数据中的事件时间(类似于基于时间的聚合)和在事件时间上定义的水位线。如果某个键配置了特定的超时时间戳 T(即不是持续时间),则如果自上次调用该函数以来未收到该键的新数据,则当水位线超过 T 时该键将超时。回想一下,水位线是一个移动阈值,它落后于处理数据时看到的最大事件时间。因此,与系统时间不同,水位线以与处理数据相同的速率在时间上向前移动。这意味着(与处理时间超时不同)查询处理中的任何减慢或停机时间都不会导致假超时。

让我们修改示例以使用事件时间超时。除了我们已经为使用处理时间超时所做的更改之外,我们还将进行以下更改:

在输入数据集上定义水位线(假设该类 UserAction 具有一个 eventTimestamp 字段)。回想一下,水位线阈值代表数据可能延迟或无序到达的可接受的时间量。

更新 mapGroupsWithState()以使用 EventTimeTimeout。

 更新函数以设置将发生超时的阈值时间戳。请注意,事件时间超时不允许设置超时时间,例如处理时间超时。我们稍后将讨论其原因。在此示例中,我们将计算此超时作为当前水位线加上一小时。

这是更新的示例:

// In Scala

def updateUserStatus(

    userId: String,

    newActions: Iterator[UserAction],

state: GroupState[UserStatus]):UserStatus = {


if (!state.hasTimedOut) {  // Was not called due to timeout

    val userStatus = if (state.getOption.getOrElse {

      new UserStatus()

    }

    newActions.foreach { action => userStatus.updateWith(action) }

    state.update(userStatus)


 // Set the timeout timestamp to the current watermark + 1 hour

    state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour")

    return userStatus

 } else {

    val userStatus = state.get()

    state.remove()

    return userStatus.asInactive() }

}

val latestStatuses = userActions

  .withWatermark("eventTimestamp", "10 minutes")

  .groupByKey(userAction => userAction.userId)

  .mapGroupsWithState(

GroupStateTimeout.EventTimeTimeout)(

    updateUserStatus _)


该查询对于因重新启动和处理延迟而导致的虚假超时将更加健壮。

以下是有关事件时间超时的几点注意事项:

与上一个处理时间超时的示例不同,我们使用 GroupState.setTimeoutTimestamp()代替 GroupState.setTimeoutDuration()。这是因为对于处理超时,持续时间足以计算发生超时的未来时间戳(即,当前系统时间 + 指定的持续时间),但是对于事件时间超时而言,情况并非如此。不同的应用程序可能希望使用不同的策略来计算阈值时间戳。在此示例中,我们仅基于当前水位线对其进行计算,但是另一个应用程序可以选择基于该键看到的最大事件时间时间戳(作为状态的一部分进行跟踪和保存)来计算键的超时时间戳。

超时时间戳必须设置为大于当前水位线的值。这是因为预计时间戳会在时间戳超过水位线时发生超时,因此将时间戳设置为已经大于当前水位线的值是不合逻辑的。

在我们讲完超时之前,要记住的最后一件事是,与固定持续时间的超时相比,你可以使用这些超时机制进行更具创意的处理。例如,你可以通过在状态中保存最后一个任务执行时间戳,然后使用该时间戳来设置处理时间的超时持续时间,从而在该状态上实施大致周期性的任务(例如每小时),如该代码段所示。:

// In Scala

timeoutDurationMs = lastTaskTimstampMs + periodIntervalMs -

groupState.getCurrentProcessingTimeMs()

使用 flatMapGroupsWithState()进行归纳

mapGroupsWithState()存在两个关键限制,这可能会限制我们想要实现更复杂的用例(例如,链式会话)的灵活性:

每次调用 mapGroupsWithState()时,你只需要返回一个记录。对于某些应用程序,在某些触发器中,你可能根本不想返回任何东西。

对于 mapGroupsWithState(),由于缺少有关不透明状态更新功能的更多信息,引擎假定生成的记录是更新的键/值数据对。因此,这也解释了下游的操作,并允许或不允许其中一些操作。例如,使用 mapGroupsWithState()生成的 DataFrame 不能以追加模式写到文件中。但是,某些应用程序可能希望生成可以看作追加的记录。

flatMapGroupsWithState()克服了这些限制,但以稍微复杂一些的语法为代价。它与 mapGroupsWithState()有两个区别:

返回类型是迭代器,而不是单个对象。这使函数可以返回任意数量的记录,或者,如果需要,可以不返回任何记录。

它需要另一个参数,称为运算符输出模式(不要与本章前面讨论的查询输出模式混淆),该参数定义输出记录是可以追加(OutputMode.Append)还是更新的键/值记录(OutputMode.Update)。

为了说明此函数的用法,让我们扩展我们的用户跟踪示例(我们删除了超时部分以使代码保持简洁)。例如,如果我们只想针对某些用户更改生成警报,并且想将输出警报写入文件,则可以执行以下操作:

// In Scala

def getUserAlerts(

    userId: String,

    newActions: Iterator[UserAction],

    state: GroupState[UserStatus]): Iterator[UserAlert] = {

 

  val userStatus = state.getOption.getOrElse {

    new UserStatus(userId, false)

  }

  newActions.foreach { action =>

    userStatus.updateWith(action)

  }

  state.update(userStatus)

 

  // Generate any number of alerts

  return userStatus.generateAlerts().toIterator  

}

 

val userAlerts = userActions

  .groupByKey(userAction => userAction.userId)

  .flatMapGroupsWithState(

    OutputMode.Append,

    GroupStateTimeout.NoTimeout)(

    getUserAlerts)

性能调优

结构化数据流使用 Spark SQL 引擎,因此可以使用与第 5 章和第 7 章中讨论的 Spark SQL 相同的参数进行调整。但是,与可能处理 GB 到 TB 级数据的批处理作业不同,微批处理作业通常处理的数据量要小得多。因此,运行流查询的 Spark 集群通常需要进行一些微调。请记住以下几点注意事项:

集群资源配置

由于运行流查询的 Spark 集群会 7*24 小时运行,因此适当地配置资源非常重要。资源配置不足会导致流查询落后(微批处理越来越耗时),而资源配置过多(例如已分配但未使用的内核)则会造成资源浪费,导致不必要的成本。此外,应根据流查询的性质进行分配:无状态查询通常需要更多的内核,而有状态查询通常需要更多的内存。

shuffle 的分区数

对于结构化流查询,通常需要将 shuffle 分区的数量设置得比大多数批处理查询要少得多——计算过多划分会增加开销并降低吞吐量。此外,由于有状态操作而导致的洗牌由于检查点而具有显著更高的任务开销。因此,对于具有状态操作且触发间隔为几秒到几分钟的流式查询,建议将 shuffle 分区的数量从默认值 200 调整为最多分配的内核数的两到三倍。

设置源速率限制以保持稳定性

在为查询的预期输入数据速率优化了分配的资源和配置之后,数据速率的突然激增可能会产生意外的大作业以及不稳定性。除了昂贵的超额配置方法外,你还可以使用源速率限制来防止不稳定。在受支持的源(例如,Kafka 和文件)中设置限制可防止在单个微批处理查询中消费过多的数据。大量的数据将保留在源中的缓冲区中,并且查询最终将赶上来。但是,请注意以下几点:

将限制设置得太低会导致查询未充分利用分配的资源并落后于输入速率。

限制不能有效地防止输入速率的持续增加。在保持稳定性的同时,缓冲的以及未处理的数据量将在源头无限增长,端到端延迟也将无限增长。

同一 Spark 应用程序中的多个流查询

在相同 SparkContext 或 SparkSession 的条件下运行多个流查询可以导致细粒度的资源共享。然而:

连续执行每个查询会使用 Spark 驱动程序(即运行它的 JVM)中的资源。这限制了驱动程序可以同时执行的查询数量。达到这些限制可能会成为任务调度的瓶颈(即,未充分利用 Executor)或超过内存限制。

通过将查询设置为在单独的调度程序池中运行,可以确保在相同上下文中的查询之间分配更公平的资源。将每个流的 SparkContext thread-local 属性 spark.scheduler.pool 设置为不同的字符串值:

// In Scala

// Run streaming query1 in scheduler pool1

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")

df.writeStream.queryName("query1").format("parquet").start(path1)

 

// Run streaming query2 in scheduler pool2

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")

df.writeStream.queryName("query2").format("parquet").start(path2)


 In Python

  Run streaming query1 in scheduler pool1

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")

df.writeStream.queryName("query1").format("parquet").start(path1)

 

 Run streaming query2 in scheduler pool2

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")

df.writeStream.queryName("query2").format("parquet").start(path2)


总结

本章探讨了使用 DataFrame API 编写结构化流查询。具体来说,我们讨论了:

结构化流的中心思想和将输入数据流视为无界表的处理模型

定义,启动,重新启动和监控流查询的关键步骤

如何使用各种内置流数据源和接收器以及如何编写自定义流接收器

如何使用和调整托管状态操作(例如流聚合和流-流连接)

表达自定义状态计算的技术

通过阅读本章中的代码片段以及该书的 GitHub 仓库中的笔记(notebook),你将了解如何有效使用结构化流。在下一章中,我们将探讨如何管理从批处理和流式工作负载同时读取和写入的结构化数据。

发布于: 40 分钟前阅读数: 6
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
结构化流-Structured Streaming(八-下)