写点什么

基于时间和窗口的算子 (六)

发布于: 2 小时前
基于时间和窗口的算子(六)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在本章中,我们将介绍用于时间处理的 DataStream API 方法和基于时间的算子(如 windows)。正如你在“时间语义”一节中学到的,Flink 基于时间的算子可以应用于不同的时间语义。

首先,我们将学习如何定义时间特性、时间戳和水位线。然后,我们将涵盖处理函数以及低级算子的转换操作,从而对时间戳和水位线进行处理和注册定时器。接下来,我们将使用 Flink 的窗口 API,它提供了最常见窗口类型的内置实现。你还将了解关于自定义、用户定义的窗口操作和核心窗口结构等内容,如 assigners, triggers 和 evictors。最后,我们将讨论如何定时加入流以及处理延迟事件的策略。


配置时间特性 

在定义分布式流处理应用程序中的时间算子操作之前,我们先了解“时间”的含义,当你指定了一个窗口用于收集每一分钟的 bucket 中产生的事件时,如何确定每个 bucket 中具体包含了哪些事件呢?在 DataStream API,你可以在创建窗口的时候使用时间特性去告知 Flink 如何定义时间,时间特性是 StreamExecutionEnvironment 的一个属性,包括了几种时间类型:

处理时间(Processing Time)

处理时间是指执行相应算子操作的机器的系统时间。当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应算子的计算机的系统时钟。每小时处理时间窗口将包括系统时钟指示整小时的时间之间到达特定算子的所有记录。例如,如果应用程序在 9:15 am 开始运行,则第一个每小时处理时间窗口将包括在 9:15 am 和 10:00 am 之间处理的事件,下一个窗口将包括在 10:00 am 和 11:00 am 之间处理的事件,以此类推。处理时间是最简单的时间概念,不需要流和机器之间的协调,无需依赖水位线,它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供结果确定性,因为它容易受到记录到达系统(例如从消息队列写入)的速度以及数据在上下游算子之间的处理速度的影响。

事件时间(Event Time)

Event Time 指的是数据流中每个元素或者每个事件自带的时间属性,一般是指事件发生的时间,系统的逻辑时间由水位线去定义。正如我们在“时间戳”章节中了解到的,时间戳要么在进入数据处理管道之前就存在于数据中,要么由源函数生成,在事件时间中,时间的进度取决于数据,而不取决于任何时钟。当水位线声明某个时间间隔内的所有时间戳都已接收到了,事件时间窗口将触发计算。理想情况下,事件时间窗口会产生确定性结果,即使事件发生顺序混乱,窗口结果将不依赖于读取或处理流的速度。

注入时间(Ingest Time)

将源算子操作的处理时间指定为每个接入记录的事件时间戳,并自动生成水位线。它是 EventTime 和 ProcessingTime 的混合体。事件的接入时间是它进入流处理器的时间。与事件时间相比,接入时间并没有提供太多的实际价值,因为它不能提供确定的结果,并且具有与事件时间相近的性能。

示例 6-1 展示了如何在“Hello, Flink!”中编写的传感器流应用程序代码来设置时间特性。

object AverageSensorReadings {

// main() defines and executes the DataStream program

def main(args: Array[String]) {

// set up the streaming execution environment

val env = StreamExecutionEnvironment.getExecutionEnvironment

// use event time for the application

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ingest sensor stream

val sensorData: DataStream[SensorReading] = env.addSource(...)

}

}


将时间特性设置为 EventTime 可以进行时间戳和水位线处理,从而可以进行事件时间操作。当然,就算你选择 EventTime 时间特性,仍然可以使用处理时间窗口和定时器。

如果使用处理时间,使用 TimeCharacteristic.ProcessingTime 替换 TimeCharacteristic.EventTime。

分配时间戳和生成水位线 

正如在“事件时间处理”章节中所讨论的,你的应用程序需要提供两个重要的信息给 Flink,以使用事件时间语义进行操作。第一个信息是每个事件必须与一个时间戳相关联,该时间戳通常指示事件实际发生的时间。第二个信息是事件时间流还需要附带水位线,使得算子可以从中推断当前事件时间。

时间戳和水位线单位为毫秒。水位线会通知算子不希望出现时间戳小于或等于水位线的事件。时间戳和水位线可以由 SourceFunction 分配和生成,也可以使用用户定义的时间戳生成器和水位线生成器。在“源函数、时间戳和水位线”一节中讨论了在 SourceFunction 中分配时间戳和生成水位线。在这里,我们将解释如何使用用户定义的函数来实现这一点。

如果使用时间戳分配程序,则现有的时间戳和水位线都会被覆盖。

DataStream API 提供了 TimestampAssigner 接口,以便在元素被接入到流应用程序后从元素中提取时间戳。通常,时间戳分配程序是在源函数之后立即调用的,因为大多数分配程序在生成水位线时都对元素的时间戳顺序作了假设性猜想。由于元素通常是并行摄入的,所以任何导致 Flink 跨并行流分区重新分配元素的操作,都会打乱元素的时间戳顺序,例如并行性更改、KeyBy()或其他引起重新分配的操作。

最好的做法是分配时间戳,并在尽可能靠近源的地方甚至在 SourceFunction 内生成水位线。根据应用场景,在分配时间戳之前,如果这些操作没有引起元素的重新分配,可以对输入流应用执行过滤或转换操作。

为了确保事件时间操作按预期运行,应该在任何依赖于事件时间的转换之前调用分配器,例如,在第一个事件时间窗口之前。

时间戳分配程序的行为类似于其他转换算子,它们在一个数据流上调用,并产生一个新的带有时间戳的数据流和水位线。时间戳分配程序不会更改 DataStream 的数据类型。

示例 6-2 中的代码展示了如何使用时间戳分配器。在本例中,读取流之后,我们首先使用了一个过滤转换,接着调用 assignTimestampsAndWatermarks()方法,在这个方法中我们定义了时间戳分配器 MyAssigner()。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// set the event time characteristic

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ingest sensor stream

val readings: DataStream[SensorReading] = env

.addSource(new SensorSource)

// assign timestamps and generate watermarks

.assignTimestampsAndWatermarks(new MyAssigner()) 

在上面的例子中,MyAssigner 的类型可以是 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks 类型。这两个接口底层继承了 DataStream API 提供的 TimestampAssigner 接口。第一个接口定义周期性发出水位线的分配器,而第二个接口根据输入事件的属性注入水位线。接下来我们将详细描述这两个接口。

 

周期性水位线分配器(Assigner with periodic watermarks)

周期性地分配水位线意味着我们指示系统以固定的机器时间间隔检查事件时间的进度。默认时间间隔设置是 200 毫秒,但是我们可以使用 ExecutionConfig.setAutoWatermarkInterval()方法来动态配置它:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// generate watermarks every 5 seconds

env.getConfig.setAutoWatermarkInterval(5000)

在上面的示例中,程序每 5 秒发出一次水位线。实际上,每隔 5 秒,Flink 就会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果该方法返回非空值的时间戳大于前一个水位线的时间戳,则生成新的水位线。否则,如果方法返回一个空值,或者返回的水位线的时间戳小于最后发出的水位线的时间戳,则不会生成新水位线。

示例 6-3 显示了一个具有周期性时间戳的 assigner 程序,它通过获得到当前元素的最大时间戳来生成水位线。当请求新的水位线时,assigner 返回一个最大时间戳减去 1 分钟容忍间隔的水位线。

class PeriodicAssigner

extends AssignerWithPeriodicWatermarks[SensorReading] {

val bound: Long = 60 * 1000 // 1 min in ms

var maxTs: Long = Long.MinValue // the maximum observed timestamp

override def getCurrentWatermark: Watermark = {

// generated watermark with 1 min tolerance

new Watermark(maxTs - bound)

}

override def extractTimestamp(

r: SensorReading,

previousTS: Long): Long = {

// update maximum timestamp

maxTs = maxTs.max(r.timestamp)

// return record timestamp

r.timestamp

}

}

DataStream API 为具有周期性水位线的时间戳分配程序(assigners)的两种常见情况提供了实现。如果你的输入元素具有单调递增的时间戳,则可以使用快捷方法 assignAscendingTimeStamps。此方法使用当前时间戳来生成水位线,因为不能出现更早的时间戳。下面演示如何为升序时间戳生成水位线:

val stream: DataStream[SensorReading] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps(e => e.timestamp)

周期水位线生成的另一种常见情况是,当你知道输入流中的最大延迟时——元素的时间戳与所有已导入元素的最大时间戳之间的最大差异。对于这种情况,Flink 提供了 BoundedOutOfOrdernessTimeStampExtractor,它将最大延迟时间作为参数:

val stream: DataStream[SensorReading] = ...val output = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading(e =>.timestamp)

在前面的代码中,允许元素延迟 10 秒。这意味着,如果一个元素的事件时间与之前所有元素的最大时间戳之间的差距大于 10 秒,那么该元素可能在完成相应的计算并发出结果之后到达进行处理。Flink 提供了处理这些延迟事件的不同策略,我们将在“处理迟到数据”中讨论这些策略。

PUNCTUATED 水位线分配器(Assigner with punctuated watermarks)

有时输入流包含特殊的元组或标记,用于指示流的进度。对于这种情况,或者当可以根据输入元素的其他属性定义水位线时,Flink 提供了 AssignerWithPunctuatedWatermarks 接口。它定义了 checkAndGetNextWatermark()方法,该方法在 extractTimestamp()之后为每个事件调用。该方法可以决定是否生成新的水位线。如果方法返回的非空水位线大于最新发出的水位线,则发出新水位线。

示例 6-4 显示了一个 punctuated 水位线分配程序(assigner),它为从 ID 为“sensor_1”的传感器接收到的每条测量记录发出水位线。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {

   val bound: Long = 60 * 1000 // 1 min in ms

   override def checkAndGetNextWatermark(r: SensorReading,extractedTS: Long): Watermark = {

       if (r.id == "sensor_1") {

           // emit watermark if reading is from sensor_1

           new Watermark(extractedTS - bound)

      } else {

           // do not emit a watermark

           null

      }

  }

   override def extractTimestamp(r: SensorReading,previousTS: Long): Long = {

       // assign record timestamp

       r.timestamp

  }

}

水位线、延迟及完整性问题 

到目前为止,我们已经讨论了如何使用 TimestampAssigner 生成水位线,还没有讨论水位线对你的流应用程序的影响。

水位线用于权衡时间延迟和结果完整性。它反映了在执行计算之前等待数据到达的时间,例如完成窗口计算并输出结果,基于事件时间的算子使用水位线来确定其读取记录的完整性和操作的进度。根据接收到的水位线,算子计算某一个时间点之前的窗口数据,直到它接收到这个时间点前的输入记录为止。

然而,现实是我们永远不可能有完美的水位线,因为这意味着我们总是可以确定数据按时到来,没有延迟的记录。在实践中,你需要进行有根据的猜测,假设性设定数据整体之间的延迟,从而在应用程序中生成水位线,这个没有固定的方法,更多的是根据经验来确定水位线。你需要使用关于源、网络和分区的等因素来估计处理进度和输入记录延迟的上限,也就是对迟到数据的容忍度。估计就意味着有出错的空间,在这种情况下,生成的水位线可能是不准确的,往往会造成不必要数据延迟或应用程序延迟变大。要记住这一点,你可以使用水位线来平衡结果延迟和结果完整性。

如果生成松散的水位线(水位线远远落后于处理过的记录的时间戳),则会增加生成结果的延迟,但是可以更大程度上保证了结果完整性。此外,状态的大小通常会增加,因为应用程序需要缓冲更多的数据,直到触发计算为止。在执行计算时,我们基本可以确定所有相关的数据都是可用的。

另一方面,如果你生成了非常紧密的水位线,也就是设置了一个很小的迟到时间,这些水位线可能比一些后续记录的时间戳更大,基于时间的计算可能在所有相关数据到达之前执行,这样做可能会产生不完整或不准确的结果,但是好处是可以降低结果的延迟。

与构建的批处理应用程序不同,在基于所有数据都可用的前提条件下,延迟/结果完整性是权衡流处理应用程序的基本特征,流处理应用程序处理的是接收到的无界数据。水位线是一种功能强大的解决方式,可以根据时间控制应用程序的行为。除了水位线之外,Flink 还有许多特性来调整基于时间的操作的确切行为,如 process 函数和窗口触发器,并提供了处理迟到数据的不同方法,这些方法将在“处理迟到数据”中讨论。

处理函数 

尽管时间信息和水位线对于许多流处理应用程序非常重要,但是你可能已经注意到,我们无法通过目前为止看到的基本 DataStream API 转换来使用它们。例如,MapFunction 不能访问时间戳或当前事件时间。

DataStream API 提供了一系列底层转换,即 process 函数,这些函数可以访问记录的时间戳和水位线,并注册将来某个特定时间触发的定时器。此外,process 函数还支持将记录发送到多个输出流。process 函数通常用于构建事件驱动的应用程序,并实现可能不适用于预定义窗口和转换的自定义逻辑。例如,Flink 的 SQL 支持的大多数算子都是基于 process 函数实现的。

目前,Flink 提供八种不同的 process 函数:

ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction 和 ProcessAllWindowFunction。

正如函数名表达的含义,这些函数适用于不同的 context 中。但是,它们具有非常相似的特性。我们将通过详细讨论 KeyedProcessFunction 来继续讨论这些常见特性。

KeyedProcessFunction 是一个非常通用的函数,可以应用于 KeyedStream。对流的每个记录调用该函数,并返回零条、一条或多条记录。所有 process 函数都实现 RichFunction 接口,提供 open()、close()和 getRuntimeContext()方法。另外,KeyedProcessFunction[KEY, IN, OUT]还提供了以下两个方法:

1. processElement(v: IN, ctx: Context, out:Collector[out])为流的每个记录调用。通常,通过将结果记录传递给收集器(out:Collector)的方式来输出结果记录。context 对象使 process 函数变得特殊。它提供了对时间戳和当前记录的 key 以及对 TimerService 的访问。此外,context 可以将记录发送到侧输出流(side output)。

2.onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[out])是一个回调函数,当先前注册的计时器触发时将调用该回调函数。timestamp 参数给出触发定时器的时间戳,收集器(out:Collector)可以将记录输出。OnTimerContext 提供与 processElement()方法的 context 对象相同的服务,并返回触发触发器的时间域(处理时间或事件时间)。

时间服务和计时器

Context 和 OnTimerContext 对象的 TimerService 提供了以下方法:

 currentProcessingTime(): Long     返回当前处理时间。

 currentWatermark(): Long     返回当前水位线的时间戳。

 registerProcessingTimeTimer(timestamp:Long):Unit    为当前 key 注册一个处理时间定时器。当所在执行服务器的处理时间达到所提供的时间戳时,定时器将触发。

 registerEventTimeTimer(timestamp: Long):Unit    为当前 key 注册一个事件时间定时器。当将水位线更新为与定时器的时间戳相等或更大的时间戳时,定时器将触发。

 deleteProcessingTimeTimer(timestamp:Long): Unit    删除先前为当前 key 注册的处理时间定时器。如果不存在这样的定时器,则该方法无效。

 deleteEventTimeTimer(timestamp:Long):Unit   删除先前为当前 key 注册的事件时间定时器。如果不存在这样的定时器,则该方法无效。

当定时器触发时,将调用 onTimer()回调函数。processElement()和 onTimer()方法是同步的,以防止对状态的并发访问和操作。

定时器(Timer)只能在 key 类型流上注册。定时器(Timer)的一个常见用例是在一段时间不活动之后清除 key 状态,或者实现基于时间的自定义窗口逻辑。要在非 key 类型流上使用定时器,可以使用带有常量虚拟键的 KeySelector 来创建 key 类型流。注意,这会把所有数据移动到单个任务中,这样算子将以并行度 1 有效地执行。

对于每个 key 和时间戳,可以只注册一个定时器(Timer),这意味着每个 key 可以有多个定时器(Timers),但每个时间戳只能有一个定时器。默认情况下,KeyedProcessFunction 会保留 heap 上的优先级队列中的所有定时器(Timers)的时间戳。但是,你可以配置 RocksDB 状态后端来存储定时器(Timer)。

定时器与函数的任何其他状态一起被存入检查点。如果应用程序需要从故障中恢复,则在应用程序重新启动时过期的所有处理时间定时器将在应用程序恢复时立即触发。对于保存在保存点(savepoint)中的处理时间定时器也是如此。定时器总是异步写入检查点,只有一个例外。如果你使用带有增量检查点的 RocksDB 状态后端,并将定时器存储在 heap 上(默认设置),则它们将同步进行存入检查点。在这种情况下,建议不要过度使用定时器,以避免存入检查点的过程耗时太长。


注意

为过去的时间戳注册的定时器不会自动删除,但也会被处理。处理时间定时器在注册方法返回后立即触发。事件时间定时器在处理下一个水位线时触发。

下面的代码展示了如何将 KeyedProcessFunction 应用到 KeyedStream 上。该函数用于监控传感器的温度,如果传感器的温度在处理时间内 1 秒出现了上升,则发出告警:

val warnings = readings

// key by sensor id

.keyBy(_.id)

// apply ProcessFunction to monitor temperatures

.process(new TempIncreaseAlertFunction)

TempIncreaseAlertFunction 的实现如例 6-5 所示。

/** Emits a warning if the temperature of a sensor

* monotonically increases for 1 second (in processing time).

*/

class TempIncreaseAlertFunction extends

KeyedProcessFunction[String,SensorReading,String]

{

// stores temperature of last sensor reading

lazy val lastTemp: ValueState[Double]=getRuntimeContext.getState(new ValueStateDescriptor[Double])

// stores timestamp of currently active timer

lazy val currentTimer: ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))


override def processElement(r: SensorReading,ctx: KeyedProcessFunction[String,SensorReading,String]#Context,out: Collector[String]): Unit = {

       // get previous temperature

       val prevTemp = lastTemp.value()

       // update last temperature

       lastTemp.update(r.temperature)

       val curTimerTimestamp = currentTimer.value();

       if (prevTemp == 0.0 || r.temperature < prevTemp) {

           // temperature decreased; delete current timer

           ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)

           currentTimer.clear()

      } else if (r.temperature > prevTemp && curTimerTimestamp == 0) {

           // temperature increased and we have not set a timer yet

           // set processing time timer for now + 1 second

           val timerTs = ctx.timerService().currentProcessingTime()+ 1000

           ctx.timerService().registerProcessingTimeTimer(timerTs)

           // remember current timer

           currentTimer.update(timerTs)

      }

  }

 override def onTimer(

       ts: Long,

       ctx: KeyedProcessFunction[String, SensorReading,

       String]#OnTimerContext,

       out: Collector[String]): Unit = {

       out.collect("Temperature of sensor '" + ctx.getCurrentKey +

       "' monotonically increased for 1 second.")

       currentTimer.clear()

  }

向侧输出流发送数据 

DataStream API 的大多数算子只有一个输出,产生一个具有特定数据类型的结果流。只有 split 算子允许将一个流拆分为多个相同类型的流。侧输出流是处理函数的一个特性,用于输出多个流,通常用于处理迟到数据。侧输出由 OutputTag[X]对象标识,其中 X 是结果侧输出流的类型。处理函数可以通过 context 对象将一条记录发送到一个或多个侧输出流。

示例 6-6 展示了如何通过侧输出流的 DataStream 的 ProcessFunction 发送数据。

val monitoredReadings: DataStream[SensorReading] = readings

// monitor stream for readings with freezing temperatures.process(new FreezingMonitor)

// retrieve and print the freezing alarms side outputmonitoredReadings.getSideOutput(new OutputTag[String]).print()

// print the main outputreadings.print()

示例 6-7 显示了 FreezingMonitor 函数,该函数监控传感器读数流,将温度低于 32°F 的数据发送到侧输出流产生告警信息。

/** Emits freezing alarms to a side output for readings


with a temperature below 32F. */class FreezingMonitor extends ProcessFunction[SensorReading,SensorReading] {// define a side output taglazy val freezingAlarmOutput:OutputTag[String]=new OutputTag[String]override def processElement(r: SensorReading,

ctx:

ProcessFunction[SensorReading,SensorReading]#Context,

    out: Collector[SensorReading]): Unit = {

       // emit freezing alarm if temperature is below 32F

       if (r.temperature < 32.0) {

      ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")

      }

// forward all readings to the regular output

       out.collect(r)

  }

}

CoProcessFunction 

对于两个输入流的关联操作,DataStream API 还提供了 CoProcessFunction。与 CoFlatMapFunction 类似,CoProcessFunction 为每个输入 processElement1()和 processElement2()提供转换方法。与 ProcessFunction 类似,这两个方法都是使用 context 对象调用的,该 context 对象允许访问元素或定时器时间戳、TimerService 和侧输出流。CoProcessFunction 还提供了一个 onTimer()回调方法。示例 6-8 展示了如何应用 CoProcessFunction 来合并两个流。

// ingest sensor stream

val sensorData: DataStream[SensorReading] = ...

// filter switches enable forwarding of readings

val filterSwitches: DataStream[(String, Long)] = env

  .fromCollection(Seq(

  ("sensor_2", 10 * 1000L), // forward sensor_2 for 10

   seconds

  ("sensor_7", 60 * 1000L)) // forward sensor_7 for 1 minute

  )

val forwardedReadings = readings

   // connect readings and switches

  .connect(filterSwitches)

   // key by sensor ids

  .keyBy(_.id, _._1)

   // apply filtering CoProcessFunction

  .process(new ReadingFilter)


示例 6-9 中展示了一个 ReadingFilter 函数的实现,该函数根据过滤器开关动态过滤传感器读数流。

class ReadingFilter extends CoProcessFunction[SensorReading,(String,

Long),SensorReading] {

  // switch to enable forwarding

 lazy val forwardingEnabled: ValueState[Boolean] =

getRuntimeContext.getState(

new ValueStateDescriptor[Boolean])

// hold timestamp of currently active disable timer

lazy val disableTimer: ValueState[Long] =

getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))

override def processElement1(

 reading: SensorReading,

ctx: CoProcessFunction[SensorReading, (String, Long),

SensorReading]#Context,

out: Collector[SensorReading]): Unit =

  {

 // check if we may forward the reading

       if (forwardingEnabled.value()) {

      out.collect(reading)

 }

  }

override def processElement2(

       switch: (String, Long),

       ctx: CoProcessFunction[SensorReading, (String, Long),SensorReading]#Context,

       out: Collector[SensorReading]): Unit =

  {

 // enable reading forwarding

       forwardingEnabled.update(true)

       // set disable forward timer

       val timerTimestamp =ctx.timerService().currentProcessingTime() +

switch._2

val curTimerTimestamp = disableTimer.value()

 if (timerTimestamp > curTimerTimestamp) {

       // remove current timer and register new timer

       ctx.timerService().deleteEventTimeTimer(curTimerTimestamp)

       ctx.timerService().registerProcessingTimeTimer(timerTimestamp)

       disableTimer.update(timerTimestamp)

      }

  }

override def onTimer(

     ts:Long,

     ctx:CoProcessFunction[SensorReading,(String, Long),SensorReading]#OnTimerContext,

     out:Collector[SensorReading]): Unit = {

       // remove all state; forward switch will be false by default

       forwardingEnabled.clear()

       disableTimer.clear()

  }

}

窗口算子 

窗口操作是流处理应用程序中的常见操作。它们支持在无界流的有限间隔上进行诸如聚合类的转换。通常,这些间隔是使用基于时间的逻辑定义的。窗口算子提供了一种方法来将事件分组到有限大小的 buckets(桶)中,并对这些桶中的事件数据应用计算。例如,窗口算子可以将流的事件分组到 5 分钟的窗口中,并计算每个窗口已经接收了多少事件记录。

DataStream API 为最常见的窗口操作提供了内置方法,并提供了非常灵活的窗口机制来自定义窗口逻辑。在本节中,我们将向你展示如何定义窗口算子,介绍 DataStream API 的内置窗口类型,以及窗口函数的应用,最后解释如何定义自定义窗口逻辑。

定义窗口算子

窗口算子可以应用于 key 类型流或 none-key 类型流。key 类型窗口上的窗口算子是并行计算的,而非 key 类型窗口是在单线程处理的。

要创建窗口算子,你需要指定两个窗口组件:

1. 确定输入流的元素如何分组到窗口中的窗口分配器。窗口分配器生成一个 WindowedStream(如果应用于非 key 类型数据流,则生成 AllWindowedStream)。

2. 应用于 WindowedStream(或 AllWindowedStream)上,并处理分配给窗口的元素的窗口函数。

下面的代码演示了如何指定一个窗口分配器和一个作用于 key 类型流或非 key 类型流的窗口函数:

stream.keyBy(...).window(...) // specify the window assigner

.reduce/aggregate/process(...) // specify the window function

 

// define a nonkeyed window-all operator

stream.windowAll(...) // specify the window assigner

.reduce/aggregate/process(...) // specify the window function

在本章的其余部分,我们重点关注 key 类型窗口。非 key 类型窗口(在 DataStream API 中也称为 all-windows)是类似的。

请注意,可以通过提供自定义触发器或收回器并声明处理迟到元素的策略来自定义窗口算子。本节后面将详细讨论自定义窗口算子。

  

内置窗口分配器 

Flink 为最常见的窗口场景提供了内置的窗口分配器。我们在这里讨论的所有分配程序都是基于时间的,并已经在“数据流上的操作”中做了介绍。基于时间的窗口分配器根据元素的事件时间戳或当前处理时间向窗口分配元素。时间窗口有一个开始时间戳和一个结束时间戳。

所有内置的窗口分配器都提供一个默认触发器,一旦(处理或事件)时间过了窗口的末端,就会触发窗口计算。需要注意的是,当第一个元素被分配给一个窗口时,窗口就被创建了。Flink 永远不会计算空窗口。

基于计数器的窗口

除了基于时间的窗口之外,Flink 还支持基于计数器的窗口——将固定数量的元素按它们到达窗口算子的顺序分组的窗口。由于它们依赖于摄入顺序,基于计数的窗口是不确定的。此外,如果在使用时没有自定义触发器(在某些情况下会丢弃不完整和陈旧的窗口),则会导致问题。

Flink 的内置窗口分配器创建类型为 TimeWindow 的窗口。此窗口类型实质上表示两个时间戳之间的时间间隔,左闭右开。此类型窗口包括定义窗口边界、检查窗口是否相交以及合并重叠窗口的方法。

下面,我们将展示 DataStream API 的不同内置窗口分配器,以及如何使用它们来定义窗口算子。

 滚动窗口(Tumbling Window)

滚动窗口(Tumbling window)分配程序将元素放置到不重叠的、固定大小的窗口中,如图 6-1 所示。

Datastream API 为滚动事件时间窗口和处理时间窗口分别提供了两个分配器—TumblingEventTimeWindows 和 TumblingProcessingTimeWindows。Tumbling 窗口分配器接收一个参数,窗口大小以时间为单位,我们可以使用分配器的 of(Time size)方法来指定。时间间隔可以设置为毫秒、秒、分钟、小时或天。

下面的代码展示了如何定义事件时间和处理时间滚动窗口的传感器数据测量流:

val sensorData: DataStream[SensorReading] = ...

val avgTemp = sensorData

  .keyBy(_.id)

   // group readings in 1s event-time windows

  .window(TumblingEventTimeWindows.of(Time.seconds(1)))

  .process(new TemperatureAverager)

val avgTemp = sensorData

  .keyBy(_.id)

   // group readings in 1s processing-time windows

  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))

  .process(new TemperatureAverager)


在我们的第一个 DataStream API 示例,“数据流上的操作”章节中,窗口定义看起来有点不同。在那里,我们使用 timeWindow(size)方法定义了一个事件时间滚动窗口,这是 window.(TumblingEventTimeWindows.of(size)) 或者 window.(TumblingProcessingTimeWindows.of(size))两个窗口定义的快捷方式,具体取决于配置的时间特性。下面的代码演示了如何使用这个快捷方式:

val avgTemp = sensorData

.keyBy(_.id)

// window.(TumblingEventTimeWindows.of(size))的快捷方式

.timeWindow(Time.seconds(1))

.process(new TemperatureAverager)

默认情况下,tumbling 窗口与纪元(epoch)时间对齐,1970-01-01-00:00:00.000。例如,大小为 1 小时的分配程序将在 00:00:00、01:00:00、02:00:00 等时间定义窗口。或者,你可以指定偏移量作为分配程序中的第二个参数。下面的代码显示了偏移量为 15 分钟的窗口,偏移量分别从 00:15:00、01:15:00、02:15:00 开始,依次类推:

val avgTemp = sensorData

.keyBy(_.id)

// group readings in 1 hour windows with 15 min offset

.window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15)))

.process(new TemperatureAverager)

 

滑动窗口(SLIDING WINDOWS)

滑动窗口分配器将元素分配给固定大小的窗口,这些窗口按指定的滑动间隔移动,如图 6-2 所示。

对于滑动窗口,必须指定窗口大小和滑动间隔,以定义新窗口的滑动频率。当滑动间隔小于窗口大小时,窗口重叠,可以将元素分配给多个窗口。如果滑动间隔比窗口大小大,一些元素可能不会被分配到任何窗口,因此可能被删除。

下面的代码展示了如何将传感器读数分组到 1 小时大小的滑动窗口中,滑动间隔为 15 分钟。每个读数将被添加到四个窗口。DataStream API 提供了事件时间和处理时间分配器,以及快捷方法,可以将时间间隔偏移量设置为窗口分配器的第三个参数:

// event-time sliding windows assigner

val slidingAvgTemp = sensorData

  .keyBy(_.id)

   // create 1h event-time windows every 15 minutes

  .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(15)))

  .process(new TemperatureAverager)

// processing-time sliding windows assigner

val slidingAvgTemp = sensorData

  .keyBy(_.id)

   // create 1h processing-time windows every 15 minutes

  .window(SlidingProcessingTimeWindows.of(Time.hours(1),Time.minutes(15)))

  .process(new TemperatureAverager)

// sliding windows assigner using a shortcut method

val slidingAvgTemp = sensorData

  .keyBy(_.id)

   // shortcut for window.(SlidingEventTimeWindow.of(size,slide))

  .timeWindow(Time.hours(1), Time(minutes(15)))

  .process(new TemperatureAverager)

会话窗口(SESSION WINDOWS)

会话窗口分配器将元素放入大小不同的活动的非重叠窗口中。会话窗口的边界由不活动的间隔定义,在这些间隔中没有接收到任何记录。图 6-3 说明了如何将元素分配给会话窗口。

以下示例演示如何将传感器读数分组到会话窗口,其中每个会话都定义为 15 分钟的不活动时间:

// event-time session windows assigner

val sessionWindows = sensorData

  .keyBy(_.id)

   // create event-time session windows with a 15 min gap

  .window(EventTimeSessionWindows.withGap(Time.minutes(15)))

  .process(...)

// processing-time session windows assigner

val sessionWindows = sensorData

  .keyBy(_.id)

   // create processing-time session windows with a 15 min gap

  .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))

  .process(...)

由于会话窗口的开始和结束取决于接收到的元素,所以窗口分配器不能立即将所有元素分配到正确的窗口。相反,SessionWindows assigner 最初将每个传入元素映射到它自己的窗口中,以元素的时间戳作为开始时间,会话间隔作为窗口大小。随后,它用重叠的范围合并所有窗口。

在窗口上应用函数

窗口函数定义了对窗口中的数据元素执行的计算逻辑。有两种类型的函数可以用于窗口函数:

1. 增量聚合函数(Incremental aggregation functions):在元素被添加到窗口并保持和更新单个值为窗口状态时直接应用增量聚合函数。这些函数通常非常节省空间,并最终产生聚合值作为结果。ReduceFunction 和 AggregateFunction 都是增量聚合函数。

全量窗口函数(Full window functions):收集窗口的所有元素,并在对所有收集的元素求值时遍历元素列表。全量窗口函数通常需要更多的内存,可以完成比增量聚合函数更复杂的逻辑。ProcessWindowFunction 是一个全量窗口函数。

在本节中,我们将讨论可以应用于窗口的不同类型的函数,以便对窗口的内容执行聚合或任意计算。我们还展示了如何在窗口算子中关联使用增量聚合函数和全量窗口函数。

REDUCEFUNCTION

在讨论在 key 类型流上运行聚合时,在“KeyedStream 转换”中引入了 ReduceFunction。ReduceFunction 接受相同类型的两个值,并将它们组合成相同类型的单个值。当应用于有窗口的流时,ReduceFunction 增量地聚合分配给窗口的元素。窗口只存储聚合的当前结果—ReduceFunction 的输入(和输出)类型的单个值。当接收到新元素时,使用新元素和从窗口状态读取的当前值调用 ReduceFunction。窗口的状态由 ReduceFunction 的结果代替。

在窗口上应用 ReduceFunction 的优点是每个窗口的状态大小固定且占用空间小,而且函数接口简单。然而,ReduceFunction 的应用程序是有限的,而且通常局限于简单的聚合,且输入和输出类型必须是相同的。

示例 6-10 显示了一个 reduce lambda 函数,该函数每 15 秒计算每个传感器的最小温度。

val minTempPerWindow: DataStream[(String, Double)] = sensorData

  .map(r => (r.id, r.temperature))

  .keyBy(_._1)

  .timeWindow(Time.seconds(15))

  .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

AGGREGATEFUNCTION

与 ReduceFunction 类似,AggregateFunction 也增量地应用于应用于窗口的元素。此外,具有 AggregateFunction 的窗口算子的状态也由一个值组成。

虽然 AggregateFunction 的接口更加灵活,但是与 ReduceFunction 的接口相比,它的实现也更加复杂。下面的代码展示了 AggregateFunction 的接口:

public interface AggregateFunction<IN, ACC, OUT> extends  Function, Serializable {

   // create a new accumulator to start a new aggregate.

   ACC createAccumulator();

   // add an input element to the accumulator and return the accumulator.

   ACC add(IN value, ACC accumulator);

   // compute the result from the accumulator and return it.

   OUT getResult(ACC accumulator);

   // merge two accumulators and return the result.

   ACC merge(ACC a, ACC b);

}

该接口定义输入类型 IN、一个类型为 ACC 的累加器和结果类型 OUT。与 ReduceFunction 相反,中间数据类型和输出类型不依赖于输入类型。

示例 6-11 展示了如何使用聚合函数来计算每个窗口的传感器读数的平均温度。累加器维护前面计数的和,以及计数,getResult()方法计算平均值。

val avgTempPerWindow: DataStream[(String, Double)] = sensorData

.map(r => (r.id, r.temperature))

.keyBy(_._1)

.timeWindow(Time.seconds(15))

.aggregate(new AvgTempFunction)

// An AggregateFunction to compute the average tempeature per sensor.

// The accumulator holds the sum of temperatures and an event count.

class AvgTempFunction extends AggregateFunction

[(String, Double), (String, Double, Int), (String, Double)] {

   override def createAccumulator() = {

  ("", 0.0, 0)

  }

   override def add(in: (String, Double), acc: (String, Double,Int)) = {

  (in._1, in._2 + acc._2, 1 + acc._3)

  }

   override def getResult(acc: (String, Double, Int)) = {

  (acc._1, acc._2 / acc._3)

  }

   override def merge(acc1: (String, Double, Int), acc2:(String, Double, Int)) = {

  (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)

  }

}

PROCESSWINDOWFUNCTION

ReduceFunction 和 AggregateFunction 用于窗口增量更新。然而,有时我们需要访问窗口的所有元素来执行更复杂的计算,例如计算窗口中值的中位数或最频繁出现的值。对于这样的应用程序,ReduceFunction 和 AggregateFunction 都不合适。Flink 的 DataStream API 提供了 ProcessWindowFunction 来对窗口的内容执行任意计算。

注意

Flink 1.7 的 DataStream API 具有 WindowFunction 接口。WindowFunction 已经被 ProcessWindowFunction 所取代,这里不再讨论它。

下面的代码显示了 ProcessWindowFunction 的接口:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>

extends AbstractRichFunction {

   // Evaluates the window

 void process(KEY key,Context ctx,Iterable<IN> vals,Collector<OUT> out)throws Exception;

   // Deletes any custom per-window state when the window is purged

   public void clear(Context ctx) throws Exception {}

   // The context holding window metadata

   public abstract class Context implements Serializable {

       // Returns the metadata of the window

       public abstract W window();

       // Returns the current processing time

       public abstract long currentProcessingTime();

       // Returns the current event-time watermark

       public abstract long currentWatermark();

       // State accessor for per-window state

       public abstract KeyedStateStore windowState();

       // State accessor for per-key global state

       public abstract KeyedStateStore globalState();

       // Emits a record to the side output identified by the OutputTag.

       public abstract <X> void output(OutputTag<X> outputTag, X value);

  }

}

process()方法中使用了窗口 key,使用迭代器访问窗口的元素,使用收集器输出结果。此外,该方法具有与其他处理方法类似的 context 参数。ProcessWindowFunction 的 context 对象允许访问窗口的元数据、当前处理时间和水位线、用于管理每个窗口和每个 key 的全局状态的状态存储,以及用于输出记录的侧输出。

在介绍 Process 函数时,我们已经讨论了 context 对象的一些特性,比如对当前处理的访问以及事件时间和侧输出流。然而,ProcessWindowFunction 的 context 对象也提供了独特的特性。窗口的元数据通常包含可以用作窗口标识符的信息,例如时间窗口的开始和结束时间戳。

另一个特性是每个窗口和每个 key 的全局状态。全局状态是指不局限于任何窗口的 key 状态,而 Per-window 状态是指当前正在计算的窗口实例。每个窗口的状态有助于维护在同一窗口上的 process()方法的多个调用之间共享的信息,这些调用可能由于配置允许的延迟或使用自定义触发器而发生的。利用每个窗口状态的 ProcessWindowFunction 需要实现它的 clear()方法,用于清除当前窗口之前的特定窗口状态。全局状态可用于在同一 key 上的多个窗口之间共享信息。

示例 6-12 将传感器读取流分组为 5 秒的 tumbling 窗口,并使用 ProcessWindowFunction 计算窗口内发生的最低和最高温度。它为每个窗口输出一条记录,包括窗口的开始和结束时间戳以及最小和最大温度。

// output the lowest and highest temperature reading every 5 seconds

val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData

 .keyBy(_.id)

 .timeWindow(Time.seconds(5))

.process(new HighAndLowTempProcessFunction)

case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)

/**

* A ProcessWindowFunction that computes the lowest and highest temperature

* reading per window and emits them together with the end timestamp of the window.

*/

class HighAndLowTempProcessFunction

extends ProcessWindowFunction[SensorReading, MinMaxTemp,String, TimeWindow] {

   override def process(

       key: String,

       ctx: Context,

       vals: Iterable[SensorReading],

       out: Collector[MinMaxTemp]): Unit = {

           val temps = vals.map(_.temperature)

           val windowEnd = ctx.window.getEnd


out.collect(MinMaxTemp(key, temps.min, temps.max,windowEnd))

  }

}

在内部,由 ProcessWindowFunction 计算的窗口将所有分配的事件存储在一个 ListState 中。通过收集所有事件并提供对窗口元数据和其他特性的访问,ProcessWindowFunction 可以处理比 ReduceFunction 或 AggregateFunction 更多的用例。但是,收集所有事件的窗口的状态可能比元素增量聚合窗口的状态大得多。

INCREMENTAL AGGREGATION 和 PROCESSWINDOWFUNCTION

ProcessWindowFunction 是一个非常强大的窗口函数,但是需要谨慎地使用它,因为它通常比增量聚合函数保存更多的状态数据。通常,需要应用于窗口的逻辑可以表示为增量聚合,但也需要访问窗口元数据或状态。

如果你具有增量聚合逻辑,同时需要访问窗口元数据,则可以将执行增量聚合的 ReduceFunction 或 AggregateFunction 与提供对更多功能访问的 ProcessWindowFunction 组合使用。分配给窗口的元素将被立即聚合,当窗口的触发器触发时,聚合的结果将被传递给 ProcessWindowFunction。process()方法的可迭代参数将只提供单个值,即增量聚合的结果。

在 DataStream API 中,这是通过将 ProcessWindowFunction 作为 reduce()或 aggregate()方法的第二个参数来实现的,如下面的代码所示:

input

.keyBy(...)

.timeWindow(...)

.reduce(

   incrAggregator: ReduceFunction[IN],

   function: ProcessWindowFunction[IN, OUT, K, W])

input

.keyBy(...)

.timeWindow(...)

.aggregate(

   incrAggregator: AggregateFunction[IN, ACC, V],

   windowFunction: ProcessWindowFunction[V, OUT, K, W])

例子 6-13 和 6-14 中的代码展示了如何解决相同的用例(代码例子 6-12),联合 ReduceFunction 和 ProcessWindowFunction,每 5 秒输出最小和最大温度传感器和每个窗口的结束时间戳。

case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)

val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData

.map(r => (r.id, r.temperature, r.temperature))

.keyBy(_._1)

.timeWindow(Time.seconds(5))

.reduce(

   // incrementally compute min and max temperature

  (r1: (String, Double, Double), r2: (String, Double,Double))

   => {(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))},

   // finalize result in ProcessWindowFunction

   new AssignWindowEndProcessFunction()

)

如示例 6-13 所示,ReduceFunction 和 ProcessWindowFunction 都是在 reduce()方法调用中定义的。由于聚合是由 ReduceFunction 执行的,ProcessWindowFunction 只需要将窗口结束时间戳附加到增量计算的结果中,如示例 6-14 所示。

class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp,String, TimeWindow] {

   override def process(

       key: String,

       ctx: Context,

       minMaxIt: Iterable[(String, Double, Double)],

       out: Collector[MinMaxTemp]): Unit = {

           val minMax = minMaxIt.head

           val windowEnd = ctx.window.getEnd

           out.collect(MinMaxTemp(key, minMax._2, minMax._3,windowEnd))

  }

}

自定义窗口算子 

使用 Flink 的内置窗口分配器定义的窗口算子可以处理许多常见的用例。然而,当你开始编写更高级的流处理应用程序时,你可能会发现自己需要实现更复杂的窗口逻辑,比如在处理迟到的元素时,输出早期结果并更新其结果的窗口,或者当收到特定记录时开始和结束的窗口。

DataStream API 提供了定义自定义窗口算子的接口和方法,允许你实现自己的分配器、触发器和回收器。通过使用前面所学的窗口函数以及这些组件,对窗口中的元素进行分组和处理。

当元素到达窗口算子时,它被传递给 WindowAssigner。WindowAssigner 确定元素需要路由到哪个窗口。如果该窗口还不存在,则创建新窗口。

如果窗口算子配置了增量聚合函数,例如 ReduceFunction 或 AggregateFunction,那么新添加的元素将立即被聚合,结果将作为窗口的中间状态存储。如果窗口算子没有增量聚合函数,则将新元素附加到包含所有分配元素的 ListState。

每当一个元素被添加到一个窗口时,同时会被传递给窗口的触发器。触发器决定何时触发窗口计算,以及什么时候清除窗口及其内容。触发器可以根据分配的元素或注册的定时器(类似于处理函数)来决定在特定时间点计算或清除其窗口的内容。

触发器触发时的处理操作取决于窗口算子使用的函数类型。如果仅使用增量聚合函数使用的算子,则输出当前聚合结果。这种情况如图 6-4 所示。

如果算子只有一个完整的窗口函数,则该函数将应用于窗口的所有元素,其结果将直接输出,如图 6-5 所示。

最后,如果算子具有增量聚合函数和全量窗口函数,则将全量窗口函数应用于聚合值并输出结果。图 6-6 描述了这种情况。

回收器(evictor)是一个可选组件,可以在调用 ProcessWindowFunction 之前或之后注入。回收器可以从窗口的内容中删除收集到的元素。因为它必须遍历所有元素,所以只能在没有指定增量聚合函数的情况下使用它。

下面的代码展示了如何定义一个窗口算子与自定义触发器和回收器:

stream

.keyBy(...)

.window(...) // specify the window assigner

[.trigger(...)] // optional: specify the trigger

[.evictor(...)] // optional: specify the evictor

.reduce/aggregate/process(...) // specify the window function

虽然回收器是可选的组件,但是每个窗口算子都需要一个触发器来决定何时计算其窗口。为了提供简洁的窗口算子 API,每个 WindowAssigner 都有一个默认触发器,除非使用自定义触发器,否则将使用该触发器。

请注意,自定义的触发器将覆盖现有的触发器,而不是对其进行补充,窗口将仅根据上次定义的触发器进行计算。

在下面的部分中,我们将讨论 windows 的生命周期,并介绍定义自定义窗口分配器、触发器和回收器的接口。

窗口的生命周期

窗口算子在处理传入流数据时会创建窗口、删除窗口。如前所述,元素由 WindowAssigner 分配给窗口,触发器决定何时计算窗口,窗口函数执行实际的窗口计算。在本节中,我们将讨论窗口的生命周期—何时创建它、它包含哪些信息以及何时删除它。

当 WindowAssigner 将第一个元素分配给窗口时,将创建一个窗口。因此,一个窗口至少有一个元素。一个窗口由以下不同的状态组成:

窗口内容

如果窗口算子具有 ReduceFunction 或 AggregateFunction,则窗口内容包含分配给窗口的元素或增量聚合的结果。

窗口对象

WindowAssigner 返回零个、一个或多个窗口对象。窗口算子根据返回的对象对元素进行分组。因此,窗口对象保存用来区分窗口的信息。每个窗口对象都有一个结束 timestamp,它定义了窗口及其状态可以删除的时间点。

触发器定时器

触发器可以注册定时器,以便在特定时间点调用,例如,计算窗口或清除其内容。这些定时器由窗口算子维护。

触发器中自定义的状态

触发器可以定义和使用自定义每个窗口和每个 key 的状态。此状态完全由触发器控制,而不是由窗口算子维护。

当窗口的结束时间(由窗口对象的结束时间戳定义)达到时,窗口算子将删除窗口。是在处理时间发生这种情况,还是在事件时间发生这种情况取决于 WindowAssigner.isEventTime()方法返回的值。

当窗口被删除时,窗口算子自动清除窗口内容并丢弃窗口对象。不清除自定义触发器状态和注册的触发器定时器,因为此状态对窗口算子不透明。因此,触发器必须在 trigger .clear()方法中清除所有状态,以防止状态泄漏。

窗口分配器

WindowAssigner 决定将每个到达的元素分配给哪个窗口。元素可以添加到零、一个或多个窗口。下面展示的是 windowAssigner 接口:

public abstract class WindowAssigner<T, W extends Window> implements Serializable {

   // Returns a collection of windows to which the element is assigned

   public abstract Collection<W> assignWindows(T element,

                                               long timestamp,

                                               WindowAssignerContext context);

   // Returns the default Trigger of the WindowAssigner

public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

   // Returns the TypeSerializer for the windows of this WindowAssigner

 public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

   // Indicates whether this assigner creates event-time windows

   public abstract boolean isEventTime();

   // A context that gives access to the current processing time

 public abstract static class WindowAssignerContext {

       // Returns the current processing time

       public abstract long getCurrentProcessingTime();

  }

}

“WindowAssigner”的类型取决于输入元素的类型和元素被分配到的窗口的类型。它还需要提供一个默认触发器,如果没有指定其他触发器,则使用该触发器。例 6-15 中的代码为 30 秒滚动事件时间窗口创建了一个自定义的分配程序。

/** A custom window that groups events into 30-second tumbling

windows. */

class ThirtySecondsWindows extends WindowAssigner[Object, TimeWindow] {

   val windowSize: Long = 30 * 1000L

   override def assignWindows(

                   o: Object,

                   ts: Long,

                   ctx: WindowAssigner.WindowAssignerContext):java.util.List[TimeWindow] = {

       // rounding down by 30 seconds

       val startTime = ts - (ts % windowSize)

       val endTime = startTime + windowSize

       // emitting the corresponding time window

       Collections.singletonList(new TimeWindow(startTime,endTime))

  }

   override def getDefaultTrigger(

       env: environment.StreamExecutionEnvironment):Trigger[Object, TimeWindow] = {

       EventTimeTrigger.create()

  }

   override def getWindowSerializer(

       executionConfig: ExecutionConfig):TypeSerializer[TimeWindow] = {

       new TimeWindow.Serializer

  }

   override def isEventTime = true

}


全局窗口分配器

GlobalWindows 分配程序将所有元素映射到同一个全局窗口。它的默认触发器是 NeverTrigger,顾名思义,永不触发。因此,GlobalWindows assigner 需要一个自定义触发器和一个回收器来有选择地从窗口状态删除元素。

GlobalWindows 的结束时间戳是 Long.MAX_VALUE。因此,GlobalWindows 永远不会被彻底清除。当应用于 key 空间不断变化的 KeyedStream 时,GlobalWindows 将为每个 key 维护一些状态。所以请谨慎使用。

除了 WindowAssigner 接口之外,还有扩展了 WindowAssigner 的 MergingWindowAssigner 接口。MergingWindowAssigner 用于需要合并现有窗口的窗口算子。我们前面讨论过的 EventTimeSessionWindows assigner 就是这种分配器的一个例子,它的工作方式是为每个到达的元素创建一个新窗口,然后合并重叠的窗口。

在合并窗口时,需要确保所有合并的窗口及其触发器的状态也已适当合并。触发器接口提供了一个回调方法,该方法在合并窗口时,以及合并与窗口关联的状态时调用。下一节将更详细地讨论窗口合并。

TRIGGERS(触发器)

触发器定义何时计算窗口并输出窗口的结果。触发器可以根据特定于时间或特定数据条件(如元素计数或某些接收到的元素值)中的处理情况决定是否触发。例如,当处理时间或水位线超过窗口结束边界的时间戳时,将触发前面讨论的时间窗口的默认触发器。

触发器可以获取时间属性和定时器,并且可以使用状态。因此,它们与 Process 函数一样强大。例如,你可以实现触发逻辑,当窗口接收到一定数量的元素时触发,当一个具有特定值的元素被添加到窗口时触发,或者在检测添加元素的模式之后触发,比如“5 秒内发生两起相同类型的事件”。自定义触发器还可以用于计算和输出事件时间窗口的早期结果,在水位线到达窗口的结束时间戳之前。这是产生低延迟结果的常见策略,尽管使用了保守的水位线策略。

每次调用触发器时,它都会生成一个 TriggerResult 来确定应该对窗口执行什么操作。TriggerResult 可以取以下值之一:

CONTINUE(跳过

不触发任何操作。

FIRE(触发)

如果窗口算子具有 ProcessWindowFunction,则调用该函数并输出结果。如果窗口只有一个增量聚合函数(ReduceFunction 或 AggregateFunction),则输出当前聚合结果。窗口的状态没有改变。

PURGE(清除)

窗口的内容被完全丢弃,包含所有元数据的窗口被删除。另外,调用 ProcessWindowFunction.clear()方法来清除每个窗口的所有自定义状态。

FIRE_AND_PURGE(触发并清除

FIRE_AND_PURGE:首先计算窗口(FIRE),然后删除所有状态和元数据(PURGE)。

可能的 TriggerResult 值使你能够实现复杂的窗口逻辑。自定义触发器可以触发多次,计算新的或更新的结果,或在满足特定条件时清除窗口而不输出结果。下一个代码块显示了触发器 API:

public abstract class Trigger<T, W extends Window> implements Serializable {

   // Called for every element that gets added to a window

   TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);

   // Called when a processing-time timer fires

   public abstract TriggerResult onProcessingTime(long timestamp,W window,TriggerContext ctx);

   // Called when an event-time timer fires

   public abstract TriggerResult onEventTime(long timestamp, W window, TriggerContext ctx);

   // Returns true if this trigger supports merging of trigger state

   public boolean canMerge();

   // Called when several windows have been merged into one window

   // and the state of the triggers needs to be merged

   public void onMerge(W window, OnMergeContext ctx);

   // Clears any state that the trigger might hold for the given window

   // This method is called when a window is purged

   public abstract void clear(W window, TriggerContext ctx);

}

// A context object that is given to Trigger methods to allow them

// to register timer callbacks and deal with state

public interface TriggerContext {

   // Returns the current processing time

   long getCurrentProcessingTime();

   // Returns the current watermark time

   long getCurrentWatermark();

   // Registers a processing-time timer

   void registerProcessingTimeTimer(long time);

   // Registers an event-time timer

   void registerEventTimeTimer(long time);

   // Deletes a processing-time timer

   void deleteProcessingTimeTimer(long time);

   // Deletes an event-time timer

   void deleteEventTimeTimer(long time);

   // Retrieves a state object that is scoped to the window and the key of the trigger

   <S extends State> S getPartitionedState(StateDescriptor<S,?> stateDescriptor);

}

// Extension of TriggerContext that is given to the Trigger.onMerge() method

public interface OnMergeContext extends TriggerContext {

   // Merges per-window state of the trigger

   // The state to be merged must support merging

   void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);

}

如你所见,触发器 API 可以通过提供对时间和状态的访问来实现复杂的逻辑。触发器有两个方面需要特别注意:清理状态以及合并触发器。

在触发器中使用每个窗口的状态时,需要确保在删除窗口时正确删除了该状态。否则,窗口算子会随着时间积累越来越多的状态,你的应用程序可能会在某个时候失败。为了在删除窗口时清除所有状态,触发器的 clear()方法需要删除每个窗口的所有自定义状态,并使用 TriggerContext 对象删除所有处理时间和事件时间计时器。无法在定时器回调方法中清理状态,因为在删除窗口后不会调用这些方法。

如果触发器与 MergingWindowAssigner 一起应用,则需要能够在合并两个窗口时处理这种情况。在这种情况下,还需要合并触发器的任何自定义状态。canMerge()声明触发器是否支持合并,而 onMerge()方法需要实现执行合并的逻辑。如果触发器不支持合并,则不能将其与 MergingWindowAssigner 组合使用。


在合并触发器时,必须将所有自定义状态的描述符提供给 OnMergeContext 对象的 mergePartitionedState()方法。

注意

请注意,可合并触发器只能使用可以自动合并的状态原语——ListState, ReduceState, 或 AggregatingState.

示例 6-16 显示了一个在到达窗口结束时间之前提前触发的触发器。当第一个事件被分配给一个窗口时,触发器注册一个定时器,比当前水位线早 1 秒。当定时器触发时,将注册一个新计时器。因此,触发器最多每秒触发一次。


/** A trigger that fires early. The trigger fires at most every second. */

{

override def onElement(r: SensorReading,

                      timestamp: Long,

                      window: TimeWindow,

                      ctx: Trigger.TriggerContext): TriggerResult = {

// firstSeen will be false if not set yetval firstSeen: ValueState[Boolean] =ctx.getPartitionedState(new ValueStateDescriptor[Boolean])// register initial timer only for first element

if (!firstSeen.value()) {

       // compute time for next early firing by rounding watermark to second

       val t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))

       ctx.registerEventTimeTimer(t)

 // register timer for the window end

       ctx.registerEventTimeTimer(window.getEnd)

       firstSeen.update(true)

  }

 // Continue. Do not evaluate per element

   TriggerResult.CONTINUE

}

override def onEventTime(

   timestamp: Long,

   window: TimeWindow,

   ctx: Trigger.TriggerContext): TriggerResult = {

   if (timestamp == window.getEnd) {

       // final evaluation and purge window state

       TriggerResult.FIRE_AND_PURGE

  } else {

 // register next early firing timer

       val t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))

       if (t < window.getEnd) {

           ctx.registerEventTimeTimer(t)

      }

 // fire trigger to evaluate window

 TriggerResult.FIRE

  }

}

override def onProcessingTime(

   timestamp: Long,

   window: TimeWindow,

   ctx: Trigger.TriggerContext): TriggerResult = {

 // Continue. We don't use processing time timers

   TriggerResult.CONTINUE

}

override def clear(window: TimeWindow,ctx: Trigger.TriggerContext): Unit = {

// clear trigger stateval firstSeen: ValueState[Boolean] =ctx.getPartitionedState(new ValueStateDescriptor[Boolean])firstSeen.clear()}}


请注意,触发器使用自定义状态,该状态使用 clear()方法清除。因为我们使用的是一个简单的不可合并的 ValueState,所以触发器是不可合并的。

EVICTORS

在 Flink 的窗口机制中,Evictor(驱逐器)是一个可选组件。它可以在窗口函数执行之前或之后从窗口中删除元素。

示例 6-17 显示了 Evictor 接口。

public interface Evictor<T, W extends Window> extends Serializable {

   // Optionally evicts elements. Called before windowing function.

void evictBefore(

       Iterable<TimestampedValue<T>> elements,

       int size,

       W window,

       EvictorContext evictorContext);

 // Optionally evicts elements. Called after windowing function.

void evictAfter(

       Iterable<TimestampedValue<T>> elements,

       int size,

       W window,

       EvictorContext evictorContext);

}

// A context object that is given to Evictor methods.

interface EvictorContext {

 // Returns the current processing time.

   long getCurrentProcessingTime();

// Returns the current event time watermark.

   long getCurrentWatermark();

}

在将窗口函数应用于窗口内容之前和之后分别调用 evictBefore()和 evictAfter()方法。这两个方法都有一个 Iterable 参数(服务于添加到窗口的所有元素)、窗口中的元素数量(大小)参数、窗口对象和一个 EvictorContext 参数。通过调用可从 Iterable 获得的 Iterator 对象上的 remove()方法,可以从窗口中删除元素。

PREAGGREGATION 和 EVICTORS

Evictors 遍历窗口中的元素列表。只有当窗口收集所有添加的事件而不应用 ReduceFunction 或 AggregateFunction 以增量聚合窗口内容时,才能使用它们。

Evictors 通常应用在 GlobalWindow 上,用于对窗口进行部分清理而不清除整个窗口状态。

 

基于时间的多流关联

使用流时,一个常见需求是 connect 或 join 两个流中的事件。Flink 的 DataStream API 提供了两个内置的算子来 join 具有时间条件的流:间隔连接和窗口连接(interval join 和 window join)。在本节中,我们将介绍这两种算子。

如果无法使用 Flink 的内置连接算子来表达所需的连接语义,则可以使用 CoProcessFunction、BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 实现自定义连接逻辑。

 注意,你应该使用有效的状态访问模式和有效的状态清理策略来设计这样的算子。

基于间隔的关联(Interval Join)

interval join 连接来自两个具有公共 key 的流的事件,这两个流之间的时间戳间隔不超过指定的时间间隔。

图 6-7 显示了两个流的 interval join:A 和 B,如果 B 事件的时间戳早于 B 事件的时间戳不少于一个小时,并且晚于 B 事件的时间戳不超过 15 分钟,则将 A 事件和 B 事件进行 Join。join interval 是对称的,即来自 B 的一个事件与来自 A 的所有事件连接,这些事件都比 B 事件早不超过 15 分钟,最多晚一个小时。

interval join 目前只支持事件时间并使用内部连接语义进行操作(没有匹配事件的事件将不会被转发)。interval join 的定义如示例 6-18 所示。

input1

.keyBy(…)

.between(<lower-bound>, <upper-bound>) // bounds with respect to input1

.process(ProcessJoinFunction) // process pairs of matched events

join 事件双方都被传递到 ProcessJoinFunction 中。下界和上界定义为负的和正的时间间隔,例如,between(Time.hour(-1), Time.minute(15)).。只要下界小于上界,就可以任意选择上界和下界;可以将所有 A 事件与所有 B 事件连接起来,只要 B 事件的时间戳比 A 事件多一到两个小时。

interval join 需要缓冲来自一个或两个输入的记录。对于第一个输入,所有具有大于当前水位线的时间戳(即上限)的记录都将被缓冲。对于第二个输入,所有时间戳大于当前水位线+下界的记录将被缓冲。注意,两个界限都可能是负的。图 6-7 存储了流 A 所有加入时间戳大于当前 watermark-15 分钟的记录,和流 B 中所有时间戳大于当前 watermark---1 小时的记录。你应该意识到,interval join 的存储需求可能会大大增加,如果两个输入流的事件时间不同步,因为水位线是由“slower”流确定。

基于窗口的关联

顾名思义,window join 基于 Flink 的窗口机制。两个输入流的元素都被分配到公共窗口,并在窗口完成时联接(或分组)。

input1.join(input2)

.where(...) // specify key attributes for input1

.equalTo(...) // specify key attributes for input2

.window(...) // specify the WindowAssigner

[.trigger(...)] // optional: specify a Trigger

[.evictor(...)] // optional: specify an Evictor

.apply(...) // specify the JoinFunction

两个输入流都根据它们的 key 作为 key 流类型,公共窗口分配器将这两个流的事件映射到公共窗口,这意味着窗口将存储这两个输入的事件。当一个窗口的定时器触发时,对来自第一个和第二个输入的元素的每个组合调用 JoinFunction(向量叉积 Cross Product)。还可以指定自定义触发器和收回器。由于这两个流的事件被映射到相同的窗口,因此触发器和回收器的行为与常规窗口算子完全相同。

除了连接两个流之外,还可以使用 cogroup()而不是 join()来进行算子定义,从而将两个流联合到一个窗口上。总体逻辑是相同的,但不是对两个输入的每一对事件调用一个 JoinFunction,而是使用来自两个输入的元素的迭代器在每个窗口调用一次 CoGroupFunction。

需要注意的是,连接窗口化的流可能会产生意想不到的语义。例如,假设你使用配置了 1 小时滚动窗口的连接算子连接两个流。第一个输入的元素不会与第二个输入的元素连接,即使它们之间相差只有 1 秒,但是它们被分配到两个不同的窗口。

 

处理迟到数据 

如前所述,可以使用水位线来平衡结果的完整性和结果延迟。除非你选择一种非常保守的水位线策略,以确保所有相关记录都以高延迟为代价,否则你的应用程序很可能必须处理迟到的数据元素。

迟到元素:当它到达一个算子时,为其提供的计算已经被执行。在事件时间窗口算子的 context 中,如果事件到达算子,并且窗口分配器将其映射到已经计算过的窗口,则该事件将延迟,因为算子的水位线传递了窗口的结束时间戳。

DataStream API 提供了处理延迟事件的不同选项:

l 延迟事件可以简单地删除。

l 延迟事件可以重定向到单独的流。

l 计算结果可以根据延迟事件进行更新,并且必须输出更新。

在下面,我们将详细讨论这些选项,并展示如何将它们应用于处理函数和窗口算子。

丢弃迟到事件 

处理延迟事件的最简单方法是简单地丢弃它们。删除延迟事件是事件时间窗口算子的默认行为。因此,迟到的元素不会创建新窗口。

通过比较事件的时间戳和当前水位线,处理函数可以很容易地过滤掉延迟事件。

重定向迟到事件 

延迟事件还可以使用侧输出流特性重定向到另一个 DataStream,可以使用常规的接收函数处理或发出延迟事件。根据业务需求,后期数据稍后可以通过定期的回填过程集成到流应用程序的结果中。示例 6-20 展示了如何为延迟事件指定带有侧输出流的窗口算子。

val readings: DataStream[SensorReading] = ???

val countPer10Secs: DataStream[(String, Long, Int)] = readings

.keyBy(_.id)

.timeWindow(Time.seconds(10))

// emit late readings to a side output

.sideOutputLateData(new OutputTag[SensorReading])

// count readings per window

.process(new CountFunction())

// retrieve the late events from the side output as a stream

val lateStream: DataStream[SensorReading] = countPer10Secs.getSideOutput(new OutputTag[SensorReading])

Process 函数可以通过比较事件时间戳和当前水位线并使用常规的侧输出流 API 进行输出从而识别延迟事件。示例 6-21 显示了一个 ProcessFunction,它过滤来自其输入的延迟传感器读数,并将它们重定向到侧输出流。

val readings: DataStream[SensorReading] = ???

val filteredReadings: DataStream[SensorReading] = readings

.process(new LateReadingsFilter)

// retrieve late readings

val lateReadings: DataStream[SensorReading] = filteredReadings.getSideOutput(new OutputTag[SensorReading])/** A ProcessFunction that filters out late sensor readings and


re-directs them to a side output */class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] {

   val lateReadingsOut = new OutputTag[SensorReading]("latereadings")

 override def processElement(

       r: SensorReading,

       ctx: ProcessFunction[SensorReading,

       SensorReading]#Context,

       out: Collector[SensorReading]): Unit = {

       // compare record timestamp with current watermark

       if (r.timestamp < ctx.timerService().currentWatermark()) {

 // this is a late reading => redirect it to the side output

           ctx.output(lateReadingsOut, r)

      } else {

           out.collect(r)

      }

  }

}


基于迟到事件更新结果 

延迟事件在它们应该完成的计算之后到达算子。因此,算子输出的结果是不完整或不准确的。另一种策略是重新计算不完整的结果并输出更新,而不是删除或重定向延迟事件。但是,为了能够重新计算和更新结果,需要考虑一些问题。

支持重新计算和更新已输出结果的算子需要在发出第一个结果后保留计算所需的所有状态。但是,由于算子通常不可能永远保留所有状态,所以需要在某个时候清除状态。一旦清除了某个结果的状态,就不能再更新该结果,只能删除或重定向延迟事件。

除了保持状态外,下游算子或跟随算子的外部系统还需要能够处理这些更新。例如,将结果和 key 值窗口算子的更新写入 key 值存储的接收器算子可以通过使用 upsert 写操作用最新更新结果覆盖不准确的结果来实现这一点。对于某些用例,可能还需要区分第一个结果和由于延迟事件而导致的更新。

窗口算子 API 提供了一个方法来显式声明你期望的迟到元素。在使用事件时间窗口时,可以指定允许迟到的时间。允许迟到的窗口算子不会在水位线通过窗口的结束时间戳后删除窗口及其状态。相反,算子将继续维护包括迟到时间段内的完整窗口。当一个迟到元素在允许的迟到周期内到达时,它就像一个正常到达的元素一样被处理并传递给触发器。当水位线通过窗口的结束时间戳和延迟间隔时,窗口最终被删除,随后的所有迟到元素被丢弃。

允许的延迟可以使用 allowedLateness()方法指定,如示例 6-22 所示。

val readings: DataStream[SensorReading] = ???

val countPer10Secs: DataStream[(String, Long, Int, String)] =

readings

.keyBy(_.id)

.timeWindow(Time.seconds(10))

// process late readings for 5 additional seconds

.allowedLateness(Time.seconds(5))

// count readings and update results if late readings arrive

.process(new UpdatingWindowCountFunction)

/** A counting WindowProcessFunction that distinguishes between

* first results and updates. */

class UpdatingWindowCountFunction extends ProcessWindowFunction[

SensorReading, (String, Long, Int, String), String,

TimeWindow] {

override def process(

id: String,

ctx: Context,

elements: Iterable[SensorReading],

out: Collector[(String, Long, Int, String)]): Unit = {

       // count the number of readings

       val cnt = elements.count(_ => true)


// state to check if this is the first evaluation of the window or not

  val isUpdate = ctx.windowState.getState(

new ValueStateDescriptor[Boolean])

 if (!isUpdate.value()) {

           // first evaluation, emit first result

out.collect((id, ctx.window.getEnd, cnt, "first"))

           isUpdate.update(true)

      } else {

           // not the first evaluation, emit an update

 out.collect((id, ctx.window.getEnd, cnt, "update"))

      }

  }

}

还可以实现 ProcessFunction 来支持迟到数据。由于状态管理始终是自定义的,并且是在处理函数中手动完成的,所以 Flink 不提供支持迟到数据的内置 API。相反,你可以使用记录时间戳、水位线和计时器的这些要素来实现必要的逻辑。

总结

在本章中,学习了如何实现定时运行的流应用程序。我们解释了如何配置流应用程序的时间特性,以及如何分配时间戳和水位线。了解了基于时间的算子,包括 Flink 的处理函数、内置窗口和自定义窗口。我们还讨论了水位线的语义、如何权衡结果的完整性和结果的延迟以及处理延迟事件的策略。

 

发布于: 2 小时前阅读数: 7
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
基于时间和窗口的算子(六)