写点什么

有状态算子和应用 (七)

发布于: 1 小时前
有状态算子和应用(七)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


有状态算子和用户函数是流处理应用程序的常见组成部分,实际上,大多数重要的操作都需要对数据和部分中间结果进行记录,因为数据是一直流动的状态,并且会随着时间的推移到达接收端。Flink 很多内置的 DataStream 算子、sources 和 sinks 都是有状态的,可以用作数据缓存或者维护部分中间计算结果以及记录元数据,例如,窗口算子使用 ProcessWindowFunction 去收集输入数据,或者使用 ReduceFunction 输出结果,ProcessFunction 还可以使用定时器和一些 sink 函数用于维护事务的状态,以保证精确的一致性功能。除了内置的算子和提供的 source 及 sink 之外,Flink 的 Datastream API 还提供了用户自定义函数(UDF)注册、维护和状态访问的接口。

有状态流处理涉及了流处理的方方面面,比如故障恢复和内存管理,以及流应用程序的维护等等。第 2 章和第 3 章中我们分别讨论有状态流处理的基础以及 Flink 架构方面的细节。第 9 章阐述了如何设置和配置 Flink 从而实现可靠地处理有状态应用程序,第 10 章给出了如何操作有状态应用的引导--从应用程序保存点获取和恢复、重新扩展应用程序和升级应用程序。

本章重点聚焦在有状态 UDF 的实现,并讨论有状态应用程序的性能和健壮性。具体而言,我们将介绍如何在 UDF 中进行不同类型的状态交互和定义,我们还会讨论性能方面的问题以及如何控制函数状态的大小,最后,我们将说明如何将 key 状态配置为可查询状态,以及如何通过外部应用程序进行访问。

实现有状态函数

在“状态管理”一节中,我们介绍了函数主要有两种状态:keyed state 和 operator state,Flink 提供了多个接口来定义有状态函数,在本节中,我们将阐述如何实现 keyed state 和 operator state 的函数。

在 RuntimeContext 中声明 Keyed State

用户函数可以使用 keyed state 在 key 属性上下文中存储和访问状态。对于 key 属性的每个不同值,Flink 维护一个状态实例。函数的 keyed state 实例分布在函数算子的所有并行任务中。这意味着函数的每个并行实例负责 key 的一个子集并维护相应的状态实例。因此,keyed sate 非常类似于分布式键值映射。有关 keyed state 的更多细节,请参见“状态管理”。

keyed state 只能通过应用在 KeyedStream 上的函数使用。KeyedStream 是通过调用 DataStream.keyBy()方法构造出来的,该方法定义了一个流上的 key。KeyedStream 在指定的 key 上分区并记住 key 定义。应用于 KeyedStream 上的算子同时也会应用于其 key 定义的上下文中。

Flink 为 key 状态提供了多个原语。状态原语定义了单个键的状态结构。正确的状态原语的选择取决于函数如何与状态交互。这种选择还会影响函数的性能,因为每个状态,底层都为这些原语提供了自己的实现。Flink 支持以下状态原语:

 ValueState[T]保存一个类型为 T 的值。可以使用 ValueState.value()获取值,并使用 ValueState.update(value: T)更新值。

 ListState[T]包含 T 类型元素的列表。可以通过调用 ListState.add(value: T) 或 ListState.addAll(values: java.util.List[T])将新元素追加到列表中。状态元素可以通过调用 ListState.get()来访问,它返回所有状态元素上的一个 Iterable[T]。不能从 ListState 中删除单个元素,但是可以通过调用 ListState.update(values: java.util.List[T])更新列表。对该方法的调用将使用给定的值列表替换现有的值。

 MapState[K, V]包含 key 和 value 的映射。state 原语提供了常规 Java Map 的方法,比如 get(key: K), put(key: K, value: V), contains(key: K), remove(key: K),以及遍历 entries, keys, 和 values 的迭代器。

 ReducingState[T]提供了与 ListState[T]相同的方法(addAll()和 update()方法除外),但是 ReducingState.add(value: T)不向列表添加值,而是使用 ReduceFunction 立即聚合值。get()方法返回的迭代器,带有单个 entry 的 Iterable,该 entry 是 reduce 后的值。

 AggregatingState[I, O]的行为类似于 ReducingState。但是,它使用更通用的 AggregateFunction 来聚合值。AggregatingState.get()计算最终结果,返回一个包含单个元素的 Iterable。

所有的状态原语都可以通过调用 State.clear()来清除。

示例 7-1 展示了如何将带有键值 ValueState 的 FlatMapFunction 应用于传感器测量流。如果传感器测量的温度自上次测量以来变化超过阈值,示例应用程序将发出报警事件。

val sensorData: DataStream[SensorReading] = ???

// partition and key the stream on the sensor ID

val keyedData: KeyedStream[SensorReading, String] = sensorData

.keyBy(_.id)

// apply a stateful FlatMapFunction on the keyed stream which

// compares the temperature readings and raises alerts

val alerts: DataStream[(String, Double, Double)] = keyedData.flatMap(new TemperatureAlertFunction(1.7))

具有 keyed state 的函数必须应用于 KeyedStream。在应用该函数之前,需要通过调用输入流上的 keyBy()方法来指定 key。当调用具有 key 输入的函数处理方法时,Flink 的 runtime 环境自动将该函数的所有 keyed state 对象放入 key 的上下文中。因此,一个函数只能访问它当前处理的记录的状态。

例 7-2 显示了带有 key-value ValueState 的 FlatMapFunction 的实现,该函数检查测量的温度变化是否大于配置的阈值。 

class TemperatureAlertFunction(val threshold: Double)

extends RichFlatMapFunction[SensorReading, (String, Double, Double)]

{

// the state handle object

private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {

// create state descriptor

val lastTempDescriptor =

new ValueStateDescriptor[Double]("lastTemp", classOf[Double])

// obtain the state handle

lastTempState = getRuntimeContext.getState[Double]

(lastTempDescriptor)

}

 

override def flatMap(

reading: SensorReading,

out: Collector[(String, Double, Double)]): Unit = {

// fetch the last temperature from state

val lastTemp = lastTempState.value()

// check if we need to emit an alert

val tempDiff = (reading.temperature - lastTemp).abs

if (tempDiff > threshold) {

// temperature changed by more than the threshold

out.collect((reading.id, reading.temperature, tempDiff))

}

// update lastTemp state

this.lastTempState.update(reading.temperature)

}

}

要创建一个状态对象,我们必须通过 RuntimeContext 向 Flink 的 runtime 注册一个 StateDescriptor,它是由 RichFunction 提供的(有关 RichFunction 接口的讨论,请参阅“实现函数”)。StateDescriptor 特定于状态原语,包括状态的名称和状态的数据类型。ReducingState 和 AggregatingState 的描述符也需要 ReduceFunction 或 AggregateFunction 对象来聚合添加的值。状态名限定在算子的作用域内,因此一个函数可以通过注册多个状态描述符来拥有多个状态对象。由状态处理的数据类型被指定为类或类型信息对象(有关 Flink 的类型处理的讨论,请参阅“types”)。必须指定数据类型,因为 Flink 需要创建合适的序列化器。另外,还可以显式地指定类型序列化器来控制如何将状态写入状态后端、检查点和保存点。

通常,状态句柄对象是在 RichFunction 的 open()方法中创建的。open()方法在调用任何处理方法(如 flatMap 函数中的 flatMap())之前被调用。状态句柄对象(例 7-2 中的 lastTempState)是函数类的常规成员变量。

状态句柄对象仅提供对状态的访问,该状态存储在状态后端维护中。句柄不包含状态本身。

当一个函数注册了一个 StateDescriptor 时,Flink 会检查状态后端是否有该函数的数据以及给定的名称和类型的状态。如果重新启动有状态函数以从故障中恢复,或者从保存点启动应用程序,可能会发生这种情况。在这两种情况下,Flink 都将新注册的状态句柄对象链接到现有状态。如果状态后端不包含给定描述符的状态,则链接到句柄的状态初始化为空。

Scala DataStream API 提供了一些语法快捷方式,可以使用单个 ValueState 定义 map 和 flatMap 函数。示例 7-3 展示了如何使用快捷方式实现前面的示例。

val alerts: DataStream[(String, Double, Double)] = keyedData

.flatMapWithState[(String, Double, Double), Double] {

case (in: SensorReading, None) =>

// no previous temperature defined; just update the last

temperature

(List.empty, Some(in.temperature))

case (r: SensorReading, lastTemp: Some[Double]) =>

// compare temperature difference with threshold

val tempDiff = (r.temperature - lastTemp.get).abs

if (tempDiff > 1.7) {

// threshold exceeded; emit an alert and update the last

temperature

(List((r.id, r.temperature, tempDiff)), Some(r.temperature))

} else {

// threshold not exceeded; just update the last temperature

(List.empty, Some(r.temperature))

}

}

flatMapWithState()方法需要一个接受 Tuple2 的函数。tuple 的第一个字段保存输入记录到 flatMap,第二个字段保存已处理记录的 key 的检索状态的 Option。如果状态尚未初始化,则不定义 Option。该函数还返回一个 Tuple2。第一个字段是 flatMap 结果的列表,第二个字段是状态的新值。

使用 ListCheckpointed 接口实现算子 List State

算子状态由每个算子的并行实例管理,在算子的同一并行任务中处理的所有时间都可以获取相同的状态。在“状态管理”中,我们讨论了 Flink 支持的三种算子状态:

 list state

 list union state

 broadcast state

函数可以通过实现 ListCheckpointed 接口来处理算子的列表状态(list state)。ListCheckpointed 接口不能处理状态句柄,比如在状态后端(state backend)注册的 ValueState 或 ListState。相反,函数将算子状态作为常规成员变量实现,并通过 ListCheckpointed 接口的回调函数与状态后端(state backend)交互。该接口提供了两种方法:

// returns a snapshot the state of the function as a list

snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]

// restores the state of the function from the provided list

restoreState(java.util.List[T] state): Unit 

当 Flink 触发有状态函数的检查点时,将调用 snapshotState()方法。该方法有两个参数,checkpointId 和 timestamp,前者是检查点唯一的、单调递增的标识符,后者是 master 初始化检查点的时间戳。该方法必须以可序列化的状态对象列表的形式返回算子状态。

restoreState()方法总是在需要初始化函数的状态时调用——在启动作业时(无论是否从保存点启动),或者在失败的情况下。使用状态对象列表调用该方法,并必须基于这些对象恢复算子的状态。

示例 7-4 展示了如何为一个函数实现 ListCheckpointed 接口,为该函数的每个并行实例计算每个分区超过阈值的温度测量值。

class HighTempCounter(val threshold: Double)

extends RichFlatMapFunction[SensorReading, (Int, Long)]

with ListCheckpointed[java.lang.Long] {

// index of the subtask

private lazy val subtaskIdx = getRuntimeContext

.getIndexOfThisSubtask

// local count variable

private var highTempCnt = 0L

override def flatMap(

in: SensorReading,

out: Collector[(Int, Long)]): Unit = {

if (in.temperature > threshold) {

// increment counter if threshold is exceeded

highTempCnt += 1

// emit update with subtask index and counter

out.collect((subtaskIdx, highTempCnt))

}

}

override def restoreState(

state: util.List[java.lang.Long]): Unit = {

highTempCnt = 0

// restore state by adding all longs of the list

for (cnt <- state.asScala) {

highTempCnt += cnt

}

}

override def snapshotState(

chkpntId: Long,

ts: Long): java.util.List[java.lang.Long] = {

 

// snapshot state as list with a single count

java.util.Collections.singletonList(highTempCnt)

}

}

上面示例中的函数为每个并行实例计算超过配置阈值的温度测量值。该函数使用算子状态,并为每个被检查点和使用 ListCheckpointed 接口的方法恢复的并行算子实例提供一个状态变量。注意,ListCheckpointed 接口是在 Java 中实现的,并且需要 java.util.List 而不是 Scala 原生 list。

查看这个示例,你可能想知道为什么算子 state 被处理为一个状态对象列表。正如在“Scaling Stateful Operators”中讨论的,列表结构支持使用算子状态来改变函数的并行性。为了增加或减少具有算子状态的函数的并行性,算子状态需能够被重新分布到更多或更少的任务实例中。这需要分离或合并状态对象。由于分离和合并状态的逻辑是为每个有状态函数定制的,因此不能为任意类型的状态自动执行此操作。

通过提供状态对象的列表,具有算子 state 的函数可以使用 snapshotState()和 restoreState()方法实现此逻辑。snapshotState()方法将算子状态拆分为多个部分,而 restoreState()方法将算子状态组装为多个部分。当函数的状态被恢复时,状态的各个部分分布在函数的所有并行实例中,并传递给 restoreState()方法。如果并行子任务比状态对象多,则一些子任务在没有状态的情况下启动,并使用空列表调用 restoreState()方法。

再次查看示例 7-4 中的 HighTempCounter 函数,我们可以看到算子的每个并行实例都将其状态公开为带有单个 entry 的列表。如果增加这个算子的并行度,一些新的子任务将以空状态初始化,并从 0 开始计数。为了在 HighTempCounter 函数重新计算时获得更好的状态分配行为,我们可以实现 snapshotState()方法,以便将其计数分割为多个部分计数,如示例 7-5 所示。

override def snapshotState(

   chkpntId: Long,

   ts: Long): java.util.List[java.lang.Long] = {

   // split count into ten partial counts

   val div = highTempCnt / 10

   val mod = (highTempCnt % 10).toInt

   // return count as ten parts

  (List.fill(mod)(new java.lang.Long(div + 1)) ++

   List.fill(10 - mod)(new java.lang.Long(div))).asJava

}

ListCheckpointed 接口使用 Java 序列化对状态对象列表进行序列化和反序列化。如果你需要更新应用程序,这可能是个问题,因为 Java 序列化不允许迁移或配置自定义序列化程序,如果需要确保一个函数的算子状态支持应用程序更新,可以实现 CheckpointedFunction 接口,而不是 ListCheckpointed 接口。

使用连接广播状态

流应用程序中的一个常见需求是将相同的信息分发给一个函数的所有并行实例,并将其维护为可恢复状态。例如,规则流和应用规则的事件流。应用规则的函数接收两个输入流,事件流和规则流。它用算子状态存储规则,以便将它们应用于事件流的所有事件。由于函数的每个并行实例必须将所有规则保存在其算子状态中,因此需要广播规则流,以确保函数的每个实例都接收到所有规则。

在 Flink 中,这种状态称为广播状态。广播状态可以与常规的 DataStream 或 KeyedStream 相结合。示例 7-6 展示了如何实现一个温度警报应用程序,它具有可以通过广播流,动态配置阈值。

val sensorData: DataStream[SensorReading] = ???

val thresholds: DataStream[ThresholdUpdate] = ???

val keyedSensorData: KeyedStream[SensorReading, String] =sensorData.keyBy(_.id)

// the descriptor of the broadcast state

val broadcastStateDescriptor =

new MapStateDescriptor[String, Double]("thresholds", classOf[String], classOf[Double])

val broadcastThresholds: BroadcastStream[ThresholdUpdate] =

thresholds.broadcast(broadcastStateDescriptor)

// connect keyed sensor stream and broadcasted rules stream

val alerts: DataStream[(String, Double, Double)] =

keyedSensorData.connect(broadcastThresholds)

   .process(new UpdatableTemperatureAlertFunction())

 一个带有广播状态的函数作用于两个流,分三步:

1. 可以通过调用 DataStream.broadcast()来创建 BroadcastStream,并提供一个或多个 MapStateDescriptor 对象。每个描述符定义函数的单独广播状态,稍后将其应用于 BroadcastStream。

2. 将 BroadcastStream 连接到 DataStream 或 KeyedStream。BroadcastStream 必须放在 connect()方法中作为参数。

3. 对连接的流应用一个函数。根据流是否 key 类型流,可以应用 KeyedBroadcastProcessFunction 或 BroadcastProcessFunction。

示例 7-7 展示了 KeyedBroadcastProcessFunction 的实现,该函数支持在 runtime 动态配置传感器阈值。

class UpdatableTemperatureAlertFunction() extends KeyedBroadcastProcessFunction

[String, SensorReading, ThresholdUpdate, (String, Double,Double)] {

// the descriptor of the broadcast state

private lazy val thresholdStateDescriptor =

new MapStateDescriptor[String, Double]("thresholds", classOf[String], classOf[Double])

// the keyed state handle

private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {

   // create keyed state descriptor

  val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])

   // obtain the keyed state handle

   lastTempState = getRuntimeContext.getState[Double]

  (lastTempDescriptor)

}

override def processBroadcastElement(

   update: ThresholdUpdate,

   ctx: KeyedBroadcastProcessFunction

  [String, SensorReading, ThresholdUpdate, (String,Double, Double)]#Context,

   out: Collector[(String, Double, Double)]): Unit = {

   // get broadcasted state handle

   val thresholds =ctx.getBroadcastState(thresholdStateDescriptor)

   if (update.threshold != 0.0d) {

       // configure a new threshold for the sensor

       thresholds.put(update.id, update.threshold)

  } else {

       // remove threshold for the sensor

       thresholds.remove(update.id)

  }

}

override def processElement(

   reading: SensorReading,

   readOnlyCtx: KeyedBroadcastProcessFunction

      [String, SensorReading, ThresholdUpdate,(String, Double, Double)]#ReadOnlyContext,

   out: Collector[(String, Double, Double)]): Unit = {

   // get read-only broadcast state

   val thresholds =readOnlyCtx.getBroadcastState(thresholdStateDescriptor)

   // check if we have a threshold

   if (thresholds.contains(reading.id)) {

       // get threshold for sensor

       val sensorThreshold: Double = thresholds.get(reading.id)

       // fetch the last temperature from state

       val lastTemp = lastTempState.value()

       // check if we need to emit an alert

       val tempDiff = (reading.temperature - lastTemp).abs

       if (tempDiff > sensorThreshold) {

           // temperature increased by more than the threshold

           out.collect((reading.id, reading.temperature,tempDiff))

      }

  }

   // update lastTemp state

   this.lastTempState.update(reading.temperature)

}

}

 BroadcastProcessFunction 和 KeyedBroadcastProcessFunction 与常规的 CoProcessFunction 不同,因为元素处理方法是不对称的。使用不同的上下文对象调用方法 processElement()和 processBroadcastElement()。这两个上下文对象都提供了 getBroadcastState(MapStateDescriptor)方法,该方法提供对广播状态句柄的访问。但是,processElement()方法中返回的广播状态句柄提供了对广播状态的只读访问。这是一种安全机制,用于确保广播状

态在所有并行实例中保持相同的信息。此外,这两个上下文对象还提供对事件时间戳、当前水印、当前处理时间和侧输出的访问,类似于其他流程函数的上下文对象。

注意

BroadcastProcessFunction 和 KeyedBroadcastProcessFunction 也有所不同。BroadcastProcessFunction 不公开定时器服务来注册定时器,因此不提供 onTimer()方法。注意,你不应该从 KeyedBroadcastProcessFunction 的 processBroadcastElement()方法中访问 key 状态。由于广播输入没有指定 key,状态后端无法访问键值并将抛出异常。相反,keyedBroadcastProcessFunction.processbroadcastelement()方法的上下文提供了一个方法 applyToKeyedState(StateDescriptor,KeyedStateFunction)来将 KeyedStateFunction 应用于状态描述符引用的键态中的每个 key 的值。

 广播事件可能无法以确定的顺序到达

如果发出广播消息的算子以大于 1 的并行度运行,广播事件到达广播状态算子的不同并行任务的顺序可能不同。因此,你应该确保广播状态的值不依赖于接收广播消息的顺序,或者确保广播算子的并行度设置为 1。

 

使用 CheckpointedFunction 接口

CheckpointedFunction 接口是指定有状态函数的最低级别接口。它提供了注册和维护 keyed state 和操作状态的挂钩,是唯一一个允许访问操作列表联合状态的接口——在恢复或保存点重新启动时完全复制的操作状态。

CheckpointedFunction 接口定义了两个方法,initializeState()和 snapshotState(),它们的工作方式类似于算子列表状态的 listcheckpoint 接口的方法。在创建 CheckpointedFunction 的并行实例时调用 initializeState()方法。当应用程序启动或任务由于失败而重新启动时,就会发生这种情况。使用 FunctionInitializationContext 对象调用该方法,该对象提供对 OperatorStateStore 和 KeyedStateStore 对象的访问。状态存储负责向 Flink 的 runtime 注册函数状态并返回状态对象,如 ValueState、ListState 或 BroadcastState。每个状态都用一个必须是唯一的函数名注册。当函数注册状态时,状态存储尝试通过检查状态后端是否保存在给定名称下注册的函数的状态来初始化状态。如果由于失败或从保存点重新启动任务,则将从保存的数据初始化状态。如果应用程序不是从检查点或保存点启动的,则状态最初将为空。

在采取检查点之前立即调用 snapshotState()方法,并接收 FunctionSnapshotContext 对象作为参数。FunctionSnapshotContext 允许访问检查点的唯一标识符和 JobManager 启动检查点时的时间戳。snapshotState()方法的目的是确保在完成检查点之前更新所有状态对象。此外,结合 CheckpointListener 接口,可以使用 snapshotState()方法通过与 Flink 的检查点同步来一致地将数据写入外部数据存储。

例 7-8 显示了如何使用 CheckpointedFunction 接口来创建一个带有键控和算子状态的函数,该函数计算每个键和算子实例中有多少传感器读数超过了指定的阈值。

class HighTempCounter(val threshold: Double)

extends FlatMapFunction[SensorReading, (String, Long,Long)] with CheckpointedFunction {

// local variable for the operator high temperature cnt

var opHighTempCnt: Long = 0

var keyedCntState: ValueState[Long] = _

var opCntState: ListState[Long] = _

override def flatMap(

   v: SensorReading,

   out: Collector[(String, Long, Long)]): Unit = {

   // check if temperature is high

   if (v.temperature > threshold) {

       // update local operator high temp counter

       opHighTempCnt += 1

       // update keyed high temp counter

       val keyHighTempCnt = keyedCntState.value() + 1

       keyedCntState.update(keyHighTempCnt)

       // emit new counters

       out.collect((v.id, keyHighTempCnt, opHighTempCnt))

  }

}

override def initializeState(initContext:FunctionInitializationContext): Unit = {

   // initialize keyed state

   val keyCntDescriptor = new ValueStateDescriptor[Long]("keyedCnt", classOf[Long])

   keyedCntState =initContext.getKeyedStateStore.getState(keyCntDescriptor)

   // initialize operator state

   val opCntDescriptor = new ListStateDescriptor[Long]("opCnt", classOf[Long])

   opCntState =initContext.getOperatorStateStore.getListState(opCntDescriptor)

   // initialize local variable with state

   opHighTempCnt = opCntState.get().asScala.sum

}

override def snapshotState(snapshotContext: FunctionSnapshotContext): Unit = {

   // update operator state with local state

   opCntState.clear()

   opCntState.add(opHighTempCnt)

}

}

接收检查点完成通知

频繁的同步是分布式系统性能受限的主要原因。Flink 的设计旨在减少同步点。检查点是基于与数据一起流动的 barriers 实现的,因此避免了应用程序中所有算子之间的全局同步。

基于检查点机制,Flink 可以实现非常好的性能。然而,另一个含义是,应用程序的状态永远不会处于一致的状态,除了在采取检查点时的逻辑时间点。对于一些算子来说,知道检查点是否完成是很重要的。例如,目标是精确地将数据写入外部系统的接收器函数(有且只有一次的保证)必须只输出在成功的检查点之前接收到的记录,以确保在发生故障时不会重新计算接收到的数据。

正如在“检查点、保存点和状态恢复”中讨论的,只有当所有算子任务都成功地将其状态检查点存储时,检查点才会成功。因此,只有 JobManager 才能确定检查点是否成功。需要通知完成检查点的算子可以实现 CheckpointListener 接口。这个接口提供了 notifyCheckpointComplete(long chkpntId)方法,当 JobManager 注册一个已完成的检查点时(当所有算子成功地将它们的状态复制到远程存储时)可以调用该方法。

注意,Flink 不保证为每个完成的检查点调用 notifyCheckpointComplete()方法。任务可能会错过通知。在实现接口时需要考虑这一点。

 

为有状态的应用开启故障恢复

流式应用程序应该持续运行,并且必须从故障(如故障机器或进程)中恢复。大多数流应用程序要求故障不影响计算结果的正确性。

在“检查点、保存点和状态恢复”章节中,我们解释了 Flink 创建有状态应用程序的一致检查点的机制,在某时间点,算子处理完应用程序输入流的一个特定的位置所有事件,所有内置状态和用户定义的状态函数的快照形成。为了为应用程序提供容错,JobManager 定期启动检查点。

应用程序需要通过 StreamExecutionEnvironment 显式地启用定期检查点机制,如示例 7-9 所示。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// set checkpointing interval to 10 seconds (10000 milliseconds)

env.enableCheckpointing(10000L)

检查点间隔是一个重要的参数,它影响常规处理期间检查点机制的开销和从故障恢复所需的时间。更短的检查点间隔在常规处理期间会导致更高的开销,但可以实现更快的恢复,因为需要重新处理的数据更少。Flink 提供了更多的调优方式来配置检查点行为,比如一致性保证(有且一次或至少一次)的选择、并发检查点的数量、取消长时间运行的检查点的超时以及几个特定于状态后端的选项。我们将在“检查点调优和恢复”中更详细地讨论这些选项。

确保有状态应用的可维护性

运行了几周的应用程序的状态可能很庞大,甚至无法重新计算。同时,需要维护长时间运行的应用程序。bug 需要修复,功能需要调整、增加或删除,或者算子的并行性需要调整以适应更高或更低的数据速率。因此,重要的是可以将应用程序状态迁移到应用程序的新版本,或者将其重新分发到更多或更少的算子任务。

Flink 提供保存点来维护应用程序及其状态。但是,它要求应用程序初始版本的所有有状态算子指定两个参数,以确保将来可以适当地维护应用程序。这些参数是唯一的算子标识符和最大的并行度(对于具有 key 状态的算子)。下面我们将描述如何设置这些参数。

算子唯一标识符和最大并行度被写入保存点

算子的唯一标识符和最大并行度被放入一个保存点,并且不能更改。

一旦更改算子标识符或最大并行度,就不能从保存点启动应用程序,而必须在没有任何状态初始化的情况下从头开始。

 

指定算子唯一标识

应该为应用程序的每个算子指定唯一标识符。标识符被写入保存点,作为具有算子的实际状态数据的元数据。当从保存点启动应用程序时,将使用标识符将保存点中的状态映射到启动应用程序的相应算子。只有当启动的应用程序的算子的标识符相同时,才能将保存点状态恢复到该算子。

如果不显式地为有状态应用程序的算子设置唯一标识符,则在更新应用程序时将面临很多限制。我们将在“保存点”中更详细地讨论唯一算子标识符的重要性和保存点状态的映射。

我们强烈建议为应用程序的每个算子分配唯一标识符。可以使用 uid()方法设置标识符,如示例 7-10 所示。

val alerts: DataStream[(String, Double, Double)] =

keyedSensorData.flatMap(new

TemperatureAlertFunction(1.1)).uid("TempAlert")


定义 keyed state 算子的最大并行度

算子的最大并行度参数定义了算子的 key 被分割成的 key 组的数量。key 组的数量限制了可缩放 keyed state 的并行任务的最大数量。“有状态算子的缩放”讨论了 key 组以及如何缩放 key 状态。可以通过 StreamExecutionEnvironment 为应用程序的所有算子设置最大并行度,也可以使用 setMaxParallelism()方法为每个算子设置最大并行度,如示例 7-11 所示。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// set the maximum parallelism for this application

env.setMaxParallelism(512)

val alerts: DataStream[(String, Double, Double)] =

keyedSensorData.flatMap(new TemperatureAlertFunction(1.1))

// set the maximum parallelism for this operator and

// override the application-wide value

.setMaxParallelism(1024)

算子的默认最大并行度取决于应用程序第一个版本中算子的并行度:

 如果并行度小于等于 128,那么最大并行度就是 128。

 如果算子的并行度大于 128,则计算最大并行度为 nextPowerOfTwo(parallelism + (parallelism / 2))和 2^15 的最小值。

有状态应用的性能及鲁棒性

算子与状态交互的方式对应用程序的健壮性和性能有影响。有几个方面会影响应用程序的行为,比如选择在本地维护状态并执行检查点的状态后端存储、检查点算法的配置以及应用程序状态的大小。在本节中,我们将讨论确保长时间运行的应用程序的健壮执行行为和一致性能所需考虑的方面。

选择状态后端

在“State Backends(状态后端)”中,我们解释了 Flink 在状态后端维护应用程序状态。状态后端负责存储每个任务实例的本地状态,并在采取检查点时将其持久化到远程存储。因为本地状态可以通过不同的方式进行维护和检查,所以状态后端是可插拔的——两个应用程序可以使用不同的状态后端实现来维护它们的状态。状态后端的选择对有状态应用程序的健壮性和性能有影响。每个状态后端为不同的状态原语(如 ValueState、ListState 和 MapState)提供实现。

目前,Flink 提供三个状态后端,MemoryStateBackend, FsStateBackend 和 RocksDBStateBackend:

 MemoryStateBackend 将状态存储为 TaskManager JVM 进程堆上的常规对象。例如,MapState 是由 Java HashMap 对象支持的。虽然这种方法提供了非常低的读写状态延迟,但是它对应用程序的健壮性有影响。如果任务实例的状态变得太大,JVM 和所有在其上运行的任务实例可能会由于 OutOfMemoryError 错误而被杀死。此外,这种方法可能会遇到垃圾收集暂停,因为它会将许多存活时间较长的对象放在堆上。当采取检查点时,MemoryStateBackend 将状态发送给 JobManager,后者将其存储在堆内存中。因此,应用程序的总体状态必须适合 JobManager 的内存。因为它的内存是易失的,所以在 JobManager 失败的情况下,状态会丢失。由于这些限制,仅建议将 MemoryStateBackend 用于开发和调试。


 FsStateBackend 将本地状态存储在 TaskManager 的 JVM 堆上,就像 MemoryStateBackend 一样。但是,FsStateBackend 将状态写入远程持久文件系统,而不是将状态检查点指向 JobManager 的易失性内存。因此,FsStateBackend 为本地访问提供内存中的速度,并在出现故障时提供容错能力。但是,它受到 TaskManager 内存大小的限制,可能会出现垃圾收集暂停。


 RocksDBStateBackend 将所有状态存储到本地 RocksDB 实例中。RocksDB 是一个将数据持久化到本地磁盘的嵌入式键值存储。为了向 RocksDB 读写数据,需要对它进行序列化、反序列化。RocksDBStateBackend 还将状态检查点存储到远程,持久化到文件系统。由于 RocksDBStateBackend 将数据写入磁盘并支持增量检查点(更多信息见“检查点、保存点和状态恢复”),所以对于状态非常大的应用程序来说,RocksDBStateBackend 是一个不错的选择。用户场景已经使用 RocksDBStateBackend 的状态大小为多个 TB 的应用程序。但是,与在堆上维护状态相比,将数据读写到磁盘和反序列化对象的开销会导致更低的读写性能。


因为 StateBackend 是一个公共接口,所以也可以实现自定义状态后端。示例 7-12 展示了如何为应用程序及其所有有状态函数配置状态后端(这里是 RocksDBStateBackend)。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val checkpointPath: String = ???

// configure path for checkpoints on the remote filesystem

val backend = new RocksDBStateBackend(checkpointPath)

// configure the state backend

env.setStateBackend(backend)

我们在“调优检查点和恢复”中讨论了如何在应用程序中使用和配置状态后端。

选择状态原语

有状态算子(内置的或用户定义的)的性能取决于几个方面,包括状态的数据类型、应用程序的状态后端和选择的状态原语。对于读写时反序列化状态对象的状态后端,例如 RocksDBStateBackend,状态原语(ValueState、ListState 或 MapState)的选择可能会对应用程序的性能产生重大影响。例如,ValueState 在访问时是完全反序列化的,在更新时是序列化的。RocksDBStateBackend 的 ListState 实现在构造 Iterable 读取值之前反序列化所有列表条目。但是,向 ListState 添加单个值——将其附加到列表的末尾——是一种廉价的操作,因为只有附加的值是序列化的。RocksDBStateBackend 的 MapState 允许对每个键读取和写入值——只有那些被读取或写入的键和值是反序列化的。在遍历 MapState 的条目集时,序列化的条目是预先从 RocksDB 获取的,只有在实际访问键或值时才反序列化。

例如,使用 RocksDBStateBackend,使用 MapState[X, Y] 比使用 ValueState[HashMap[X, Y]] 更有效。如果元素经常被附加到列表中,并且列表中的元素很少被访问,那么 ListState[X] 相对于 ValueState[List[X]] 有一个优势。

另一个好的实践是每个函数调用只更新一次状态。由于检查点与函数调用是同步的,所以多个状态更新不会带来任何好处,但是当在单个函数调用中多次更新状态时,可能会导致额外的序列化开销。

防止状态泄露

流应用程序通常设计为连续运行数月或数年。如果应用程序的状态一直在增加,那么在某些情况下,它会变得太大并杀死应用程序,除非采取措施给应用程序扩展到更多的资源。为了防止随着时间的推移而增加应用程序的资源消耗,必须控制算子状态的大小。由于状态的处理直接影响算子的语义,所以 Flink 不能自动清除状态并释放存储。相反,所有有状态算子都必须控制其状态的大小,并确保其不会无限增长。

状态增长的一个常见原因是 keyed state 在一个不断更新的 key 域上。在此场景中,有状态函数接收带有键的记录,这些键仅在一段时间内是活动的,之后就再也不会接收。一个典型的例子是单击事件流,其中单击具有一段时间后过期的会话 id 属性。在这种情况下,带有 keyed state 的函数会为越来越多的键累积状态。随着键空间的发展,过期键的状态将变得陈旧和无用。此问题的解决方案是删除过期 key 的状态。但是,具有 keyed state 的函数只有在接收到具有该键的记录时才能访问键的状态。在许多情况下,函数不知道一条记录是否是键的最后一条记录。因此,它将不能为 key 退出状态,因为它可能会收到另一个相同 key 记录。

这个问题不仅存在于自定义有状态函数中,也存在于 DataStream API 的一些内置算子中。例如,在 KeyedStream 上计算运行的聚合,可以使用内置的聚合函数(如 min、max、sum、minBy 或 maxBy),也可以使用自定义的 ReduceFunction 或 AggregateFunction)来保持每个键的状态,并且从不丢弃它。因此,只有当键值来自常量和有界域时,才应该使用这些函数。其他例子是带有基于计数器的触发器的窗口,当接收到一定数量的记录时,这些触发器将处理并清除它们的状态。具有基于时间的触发器(处理时间和事件时间)的窗口不受此影响,因为它们根据时间触发和清除它们的状态。

这意味着在设计和实现有状态算子时,应该考虑应用程序需求及其输入数据的属性,例如键域。如果你的应用程序需要移动 key 域的 keyed state,那么它应该确保在不再需要时清除键的状态。这可以通过注册将来某个时间点的定时器来实现。与 state 类似,定时器是在当前活动 key 的上下文中注册的。当定时器触发时,将调用回调方法并加载定时器 key 的上下文。因此,回调方法可以完全访问键的状态,并且可以清除它。提供注册定时器支持的函数是 windows 的触发器接口和处理函数。两者都在第六章中讨论过。

例 7-13 显示了一个 KeyedProcessFunction,它比较两个后续的温度测量值,并在差异大于某个阈值时发出警报。这与之前的 keyed state 示例是相同的用例,但是 KeyedProcessFunction 也清除了键的状态(即 sensors),在事件发生后一小时内没有提供任何新的温度测量。

class SelfCleaningTemperatureAlertFunction(val threshold:Double)

extends KeyedProcessFunction[String, SensorReading,(String, Double, Double)] {

   // the keyed state handle for the last temperature

  private var lastTempState: ValueState[Double] = _

   // the keyed state handle for the last registered timer

   private var lastTimerState: ValueState[Long] = _

   override def open(parameters: Configuration): Unit = {

       // register state for last temperature

val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])

lastTempState = getRuntimeContext.getState[Double]

 // register state for last timer

       val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long])

astTimerState =getRuntimeContext.getState(timestampDescriptor)

  }

   override def processElement(

       reading: SensorReading,

       ctx: KeyedProcessFunction

      [String, SensorReading, (String, Double,Double)]#Context,

       out: Collector[(String, Double, Double)]): Unit = {

       // compute timestamp of new clean up timer as record timestamp + one hour

val newTimer = ctx.timestamp() + (3600 * 1000)

       // get timestamp of current timer

       val curTimer = lastTimerState.value()

       // delete previous timer and register new timer

       ctx.timerService().deleteEventTimeTimer(curTimer)

       ctx.timerService().registerEventTimeTimer(newTimer)

       // update timer timestamp state

       lastTimerState.update(newTimer)

       // fetch the last temperature from state

       val lastTemp = lastTempState.value()

       // check if we need to emit an alert

       val tempDiff = (reading.temperature - lastTemp).abs

if (tempDiff > threshold) {

           // temperature increased by more than the threshold

           out.collect((reading.id, reading.temperature, tempDiff))

      }

       // update lastTemp state

       this.lastTempState.update(reading.temperature)

  }

   override def onTimer(

       timestamp: Long,

       ctx: KeyedProcessFunction

      [String, SensorReading, (String, Double,

       Double)]#OnTimerContext,

       out: Collector[(String, Double, Double)]): Unit = {

       // clear all state for the key

       lastTempState.clear()

       lastTimerState.clear()

  }

}

上面的 KeyedProcessFunction 实现的状态清理机制如下所示。对于每个输入事件,调用 processElement()方法。在比较温度测量值并更新最后的温度之前,该方法通过删除前一个计时器并注册一个新计时器来更新清理定时器。清理时间是通过向当前记录的时间戳添加一个小时来计算的。为了能够删除当前注册的定时器,它的时间戳存储在一个名为 lastTimerState 的附加 ValueState[Long]中。之后,该方法比较温度,可能发出警报,并更新其状态。

因为我们的 KeyedProcessFunction 总是通过删除当前定时器并注册一个新定时器来更新注册的定时器,所以每个键只注册一个定时器。一旦定时器触发,就会调用 onTimer()方法。该方法清除与键关联的所有状态、最后的温度和最后的定时器状态。

更新有状态应用

我们经常需要修复一个 bug,或者更新一个长时间运行的有状态流应用程序的业务逻辑。因此,需要用更新的版本替换正在运行的应用程序,并且不能丢失应用程序的状态。

Flink 通过获取正在运行的应用程序的保存点、停止保存点并从保存点启动应用程序的新版本来支持此类更新。然而,在更新应用程序的同时,保持其状态仅在某些应用程序更改时才有可能做到——原始应用程序及其新版本需要与保存点兼容。我们将解释如何在保持保存点兼容性的同时迭代应用程序。

在“保存点”章节中,我们解释了保存点中的每个状态都可以由一个复合标识符来处理,该复合标识符由一个唯一的算子标识符和由状态描述符声明的状态名组成。

              在实现应用程序时要考虑到版本迭代

重要的是要理解应用程序的初始设计决定了以后是否以及如何以与保存点兼容的方式修改它。如果最初的版本没有在设计时考虑到更新,那么许多更改将是不可能的。对于大多数应用程序更改,必须将唯一标识符分配给算子。

当从保存点启动应用程序时,通过使用算子标识符和状态名从保存点查找相应的状态,初始化已启动应用程序的算子。从保存点兼容性的角度来看,这意味着一个应用程序可以通过以下三种方式迭代演进:

1. 在不更改或删除现有状态的情况下更新或扩展应用程序的逻辑。这包括向应用程序添加有状态或无状态算子。

2. 从应用程序中删除状态。

3. 通过更改状态原语或状态的数据类型来修改现有算子的状态。

在下面的小节中,我们将讨论这三种情况。

保持现有状态更新应用

如果更新应用程序而不删除或更改现有状态,则应用程序始终与保存点兼容,并且可以从较早版本的保存点启动。

如果你将一个新的有状态算子添加到应用程序中,或者将一个新的状态添加到一个现有的算子中,当应用程序从一个保存点启动时,该状态将初始化为空。

注意,更改内置有状态算子(如窗口聚合、基于时间的 join 或异步函数)的输入数据类型,通常会修改其内部状态的类型。因此,这样的更改并不是安全点兼容的,即使它们看起来并不明显。

从应用中删除状态

与向应用程序添加新状态不同,你可能还希望通过删除状态来调整应用程序——可以通过删除完整的有状态算子,也可以仅从函数中删除状态。当从上一版本的保存点启动应用程序的新版本时,保存点包含不能映射到重新启动的应用程序的状态。如果算子的唯一标识符或状态名被更改,也会出现这种情况。

默认情况下,Flink 不会启动不还原保存点中包含的所有状态的应用程序,以避免丢失保存点中的状态。但是,可以禁用这个安全检查,如“运行和管理流应用程序”章节中所述。因此,通过从现有算子中删除有状态算子或状态来更新应用程序并不困难。

修改算子的状态

虽然从应用程序中添加或删除状态相当容易,而且不影响保存点兼容性,但是修改现有算子的状态则更加复杂。有两种方法可以修改状态:

 通过更改状态的数据类型,例如将 ValueState[Int]更改为 ValueState[Double]

 通过更改状态原语的类型,例如将 ValueState[List[String]]更改为 ListState[String]

在一些特定的情况下,可以更改状态的数据类型。但是,Flink 目前不支持更改状态的原语(或结构)。通过提供转换保存点的离线工具,有一些方法法可以支持这种情况。然而,截至 Flink 1.7,还没有这样的工具存在。下面我们将重点讨论如何更改状态的数据类型。

为了理解修改状态数据类型的问题,我们必须理解在保存点中状态数据是如何表示的。保存点主要由序列化状态数据组成。将状态 JVM 对象转换为字节的序列化器,由 Flink 的类型系统生成和配置。这种转换基于状态的数据类型。例如,如果你有一个 ValueState[String],那么 Flink 的类型系统将生成一个 StringSerializer 来将 String 对象转换为字节。序列化器还用于将原始字节转换回 JVM 对象。根据状态后端是存储序列化的数据(如 RocksDBStateBackend)还是作为堆上的对象(如 FSStateBackend),这将在函数读取状态或从保存点重新启动应用程序时发生。

由于 Flink 的类型系统根据状态的数据类型生成序列化器,所以当状态的数据类型改变时,序列化器可能会改变。例如,如果你将 ValueState[String]更改为 ValueState[Double],那么 Flink 将创建一个 DoubleSerializer 来访问状态。使用 DoubleSerializer 反序列化用 StringSerializer 序列化字符串生成的二进制数据会失败,这并不奇怪。因此,仅在非常特定的情况下支持更改状态的数据类型。

在 Flink 1.7 中,如果数据类型定义为 Apache Avro 类型,并且新数据类型也是根据 Avro 模式演化规则从原始类型演化而来的 Avro 类型,则支持更改状态的数据类型。Flink 的类型系统将自动生成能够读取数据类型以前版本的序列化器。

状态演化和状态迁移是 Flink 社区的一个重要课题,受到了广泛的关注。你可以期望在 Apache Flink 的未来版本中改进对这些场景的支持。尽管做了这么多工作,我们还是建议在将应用程序投入生产之前,一定要仔细检查它是否能够按照计划进行改进。

可查询式状态

许多流处理应用程序需要与其他应用程序共享结果。一种常见的模式是将结果写入数据库或键值存储中,并让其他应用程序从该数据存储中检索结果。这样的体系结构意味着需要建立和维护一个单独的系统,这可能是一项重大的工作,特别是如果它还需要是一个分布式系统。

Apache Flink 提供了可查询的状态来处理通常需要外部数据存储来共享数据的用例。在 Flink 中,任何 keyed state 都可以作为可查询状态公开给外部应用程序,并充当只读键值存储。有状态的流应用程序像往常一样处理事件,并以可查询的状态存储和更新其中间结果或最终结果。外部应用程序可以在流应用程序 runtime 请求指定 key 的状态。

 

注意,只支持 key 的点查询。不可能请求 key 范围查询或甚至运行更复杂的查询。

可查询状态并不适用于所有需要外部数据存储的场景。例如,只能在应用程序 runtime 访问可查询状态。当应用程序由于错误而重新启动、重新调整应用程序或将其迁移到另一个集群时,它是不可访问的。但是,它使许多应用程序更容易实现,例如实时仪表板或其他监视应用程序。

接下来,我们将讨论 Flink 的可查询状态服务的体系结构,并解释流应用程序如何公开可查询状态,外部应用程序可以查询它。

可查询式状态服务的架构及启用方式

Flink 的可查询状态服务包含三个进程:

 QueryableStateClient:外部应用程序使用 QueryableStateClient 来提交查询和检索结果。

 QueryableStateClientProxy:接受并服务客户端请求。每个 TaskManager 运行一个客户端代理。由于 keyed state 分布在算子的所有并行实例中,因此代理需要标识为所请求的 key 维护状态的 TaskManager。此信息是从管理 key 组分配的 JobManager 请求的,并在接收到此信息后进行缓存。客户端代理从各自 TaskManager 的状态服务器检索状态,并将结果提供给客户端。

 QueryableStateServer:服务于客户端代理的请求。每个 TaskManager 运行一个状态服务器,该服务从本地状态后端获取查询 key 的状态,并将其返回给发出请求的客户端代理。

图 7-1 显示了可查询状态服务的体系结构。

为了在 Flink 设置中启用可查询状态服务——在 TaskManager 中启动客户机代理和服务器线程——你需要将 flink-queryable-state-runtime JAR 文件添加到 TaskManager 进程的类路径中。这是通过将它从安装的./opt 文件夹复制到./lib 文件夹来实现的。当 JAR 文件位于类路径中时,可查询状态线程将自动启动,并可以为可查询状态客户端的请求提供服务。正确配置后,你将在 TaskManager 日志中发现以下日志消息:

Started the Queryable State Proxy Server @ …

客户端代理和服务器使用的端口以及其他参数可以在./conf/flink-conf.yaml 文件中配置。

对外暴露可查询式状态

实现具有可查询状态的流应用程序很容易。你所要做的就是定义一个具有 keyed state 的函数,并通过在获取状态句柄之前在 StateDescriptor 上调用 setQueryable(String)方法使状态可查询。示例 7-14 展示了如何使 lastTempState 可查询,以说明 keyed state 的用法。

override def open(parameters: Configuration): Unit = {

// create state descriptorval lastTempDescriptor =new ValueStateDescriptor[Double]("lastTemp", classOf[Double])

// enable queryable state and set its external identifierlastTempDescriptor.setQueryable("lastTemperature")

// obtain the state handle

lastTempState = getRuntimeContext.getState[Double]}

通过 setQueryable()方法传递的外部标识符可以自由选择,并且仅用于配置可查询状态客户端。

除了对任何类型的 keyed state 启用查询的通用方法之外,Flink 还提供了定义流接收器(sink)的快捷方式,这些接收器以可查询状态的方式存储流事件。示例 7-15 展示了如何使用可查询状态接收器。

val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData

// project to sensor id and temperature

.map(r => (r.id, r.temperature))

// compute every 10 seconds the max temperature per sensor

.keyBy(_._1)

.timeWindow(Time.seconds(10))

.max(1)

// store max temperature of the last 10 secs for each sensor

// in a queryable state

tenSecsMaxTemps

// key by sensor id

.keyBy(_._1)

.asQueryableState("maxTemperature")

asQueryableState()方法的作用是:向流附加一个可查询的状态接收器。可查询状态的类型是 ValueState,它保存输入流的类型值,本例中为(String,Double)。对于每个接收到的记录,可查询状态接收器将该记录保存到 ValueState 中,以便始终存储每个 key 的最新事件。

具有可查询状态的函数的应用程序与任何其他应用程序一样执行。你只需确保 taskmanager 被配置为启动可查询的状态服务(如前一节所述)。

从外部系统查询状态

任何基于 jvm 的应用程序都可以使用 QueryableStateClient 来查询正在运行的 Flink 应用程序的可查询状态。这个类是由 flink-queryable-state-client-java dependency 提供的,你可以将其添加到你的项目中,如下所示:

<dependency>

   <groupid>org.apache.flink</groupid>

   <artifactid>flink-queryable-state-clientjava_2.12</artifactid>

   <version>1.7.1</version>

</dependency>

QueryableStateClient 是用任一 TaskManager 的主机名和可查询状态客户端代理监听的端口初始化的。默认情况下,客户端代理监听端口 9067,但是可以在./conf/flink-conf.yaml 文件中配置:

val client: QueryableStateClient =

new QueryableStateClient(tmHostname, proxyPort)

获得状态客户端后,可以通过调用 getKvState()方法来查询应用程序的状态。该方法接受几个参数,比如正在运行的应用程序的 JobID、状态标识符、需获取状态的 key、key 的类型信息和查询状态的状态描述符。JobID 可以通过 REST API、Web UI 或日志文件获得。getKvState()方法返回一个 CompletableFuture[S],其中 S 是状态的类型(例如,ValueState[] 或 MapState[,_ ])。因此,客户端可以发送多个异步查询并等待它们的结果。示例 7-16 显示了一个简单的控制台仪表板,它查询上一节中显示的应用程序的可查询状态。

object TemperatureDashboard {

// assume local setup and TM runs on same machine as client

val proxyHost = "127.0.0.1"

val proxyPort = 9069

// jobId of running QueryableStateJob

// can be looked up in logs of running job or the web UI

val jobId = "d2447b1a5e0d952c372064c886d2220a"

// how many sensors to query

val numSensors = 5

// how often to query the state

val refreshInterval = 10000

def main(args: Array[String]): Unit = {

// configure client with host and port of queryable state proxy

val client = new QueryableStateClient(proxyHost, proxyPort)

val futures = new Array[

CompletableFuture[ValueState[(String, Double)]]](numSensors)

val results = new Array[Double]// print header line of dashboard table

val header =

(for (i <- 0 until numSensors) yield "sensor_" + (i + 1))

.mkString("\t| ")

println(header) 

// loop forever

while (true) {

// send out async queries

for (i <- 0 until numSensors) {

futures(i) = queryState("sensor_" + (i + 1), client)

}

// wait for results

for (i <- 0 until numSensors) {

results(i) = futures(i).get().value()._2

}

// print result

val line = results.map(t => f"$t%1.3f").mkString("\t| ")

println(line)

// wait to send out next queries

Thread.sleep(refreshInterval)

}

client.shutdownAndWait()

}

def queryState(

key: String,

client: QueryableStateClient)

: CompletableFuture[ValueState[(String, Double)]] = {

client

.getKvState[String, ValueState[(String, Double)], (String, Double)]

(

JobID.fromHexString(jobId),

"maxTemperature",

key,

Types.STRING,

new ValueStateDescriptor[(String, Double)](

"", // state name not relevant here

Types.TUPLE[(String, Double)]))

}

}

为了运行示例,你必须首先使用可查询状态启动流应用程序。一旦运行,在日志文件或 web UI 中查找 JobID,在仪表板的代码中设置 JobID 并运行它。然后仪表板将开始查询正在运行的流应用程序的状态。 

总结

几乎每个重要的流应用程序都是有状态的。DataStream API 提供了强大但易于使用的工具来访问和维护算子状态。它提供了不同类型的状态原语,并支持可插入状态后端存储。虽然开发人员可以灵活地与状态交互,但是 Flink 的 runtime 可以管理 tb 级的状态,并确保在出现故障时使用一次语义。第 6 章中讨论的基于时间的计算和可伸缩状态管理的组合使开发人员能够实现复杂的流应用程序。可查询状态是一种易于使用的特性,可以节省设置和维护数据库或键值存储的工作,从而将流应用程序的结果公开给外部应用程序。

 

发布于: 1 小时前阅读数: 4
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
有状态算子和应用(七)