10 分钟了解 Flink Watermark 水印
在上一篇中,介绍了 Flink 里时间的概念和窗口计算,在实际生产过程中,由于网络等原因,许多数据会延迟到达窗口,这种情况 Flink 如何处理?Watermark 登场,本文从这几点进行介绍:水印的概念、水印如何计算、允许延迟和侧道输出、水印生成策略、案例及代码。
1、一个小例子
讲解概念前,我先举个例子。比如工厂的生产线有一批货物要发出,每个货物上都有一个生产时间的标记,司机在门口等待货物,他每天 9:00 出发,只要他看到最新过来的货物上的时间是 9:00,那他立马就出发。
但是久而久之他发现,有些货物会延迟到达,比如 9:00 的货物已经到达,忽然他又看到一个 8:59 的货物到达了,为了能够一次性运送更多的货物,他决定继续多等 5 分钟,即:如果 9:05 的货物到达后,他就立马出发,不再等待了。
这样的话,即使有延迟到达的货物,只要它们能在 9:05 分之前到达,那这部分货物也会被发出。
2、水印的概念
我们来思考一个场景,比如,对于窗口[12:00-12:10),事件时间为 12:04 的数据,由于网络原因,到达 Flink 的时间是 12:11。此时窗口已经关闭了,该数据将不属于任何窗口,最终这个数据会丢失。
所以,为了保证计算结果的正确性,需要让窗口等待延迟数据到达后再进行计算,但是也不能无限期地等待下去,必须有一种机制来确定何时触发窗口计算,这种机制就是水印(Watermark)。
水印是一种用于衡量事件时间进度的机制,其表示某个时刻(事件时间)以前的数据将不再产生,因此水印指的是一个时间点。水印作为数据流的一部分流动,并带有时间戳 t。t 表示该流中不应再有时间戳小于等于 t 的元素(即时间戳早于或等于水印的事件)。
如下图,显示了带有时间戳和嵌入式水印的事件流,事件是按顺序排列的,这意味着水印只是流中的周期性标记。
水印对于乱序流至关重要,如下图,其中事件不是按其时间戳排序的。通常,水印是数据流中一个点的声明,表示水印之前的所有事件都应该到达。一旦水印到达,算子则认为某个时间周期内的所有事件已经被收到,不会再有更多符合条件的事件了。
3、水印如何计算
计算水印需要提前指定一个允许最大延迟时间的参数。
水印 = 进入 Flink 的当前最大事件时间(比如上面例子中的 9:05 分到达的货物) ‒ 允许最大延迟时间(比如上面例子中的司机多等待的 5 分钟)。
当水印 >= 窗口结束时间时,立即触发窗口计算,计算完毕后发射出计算结果并销毁窗口,否则窗口将一直等待。
所以,窗口触发计算的规则是:进入 Flink 的当前最大事件时间 >= 窗口结束时间+允许最大延迟时间。可见,设置水印后会改变窗口的触发计算规则。
例子:假设有一个[9:00~9:10)的窗口,设置的允许最大延迟时间为 3 分钟,当事件时间戳为 9:11 的事件到达时(说明有些数据可能已经延迟了,我在多等一会儿),由于该事件时间是进入 Flink 的当前最大事件时间,因此 Watermark = 9:11‒3(分钟)= 9:08。此时水印在窗口内部不会触发窗口计算,窗口继续等待延迟数据。如下图:
接下来当事件时间戳为 9:15 的事件到达时,由于该事件时间是进入 Flink 的当前最大事件时间,因此 Watermark = 9:15‒3(分钟)= 9:12。此时水印在窗口外部,满足窗口触发计算的规则:Watermark >= 窗口结束时间,因此窗口会立即触发计算,计算完毕后发射出计算结果并销毁窗口。
水印机制可以在一定程度上解决数据延迟到达问题,但不能完全解决。因为有些数据延迟太多了,这部分数据 Flink 默认丢弃掉。为了保证数据不丢失,Flink 提供了允许延迟(AllowedLateness)和侧道输出机制(Side Output)。注意:这里的允许延迟,和水印的延迟时间不是一个概念,这里的允许延迟是水印之后的延迟。
4、允许延迟和侧道输出
允许延迟机制与水印不同,允许延迟并不会延迟触发窗口计算,而是触发窗口计算之后不会立马销毁窗口,会在一段时间内继续保留计算状态。
超过允许延迟时间的数据,Flink 会将其放入侧道输出。侧道输出可以将数据收集起来,根据系统自身业务单独处理或存放于指定位置。
allowedLateness(lateness: Time):设置允许的延迟时间。
sideOutputLateData(outputTag: OutputTag[T]):将延迟到达的数据保存到 outputTag 对象中。
5、水印生成策略
我们可以针对每个事件生成水印,但是由于每个水印都会在下游做一些计算,因此过多的水印会降低程序性能。这就需要一种策略来规定 Flink 程序什么时候可以开始生成水印。
在 Flink DataStream 中使用 assignTimestampsAndWatermarks 方法用于生成水印。其作用是给数据流中的元素分配时间戳(Flink 需要知道每个元素的事件时间),并生成水印以标记事件时间进度。
水印策略分为内置水印策略和自定义水印策略:
1、周期性水印策略
周期性地产生水印,默认周期时间是 200 毫秒。意思是,每隔 200 毫秒系统开始生成水印,其生成的规则为:水印 = 进入 Flink 的当前最大事件时间 ‒ 允许的最大延迟时间。
2、单调递增水印策略
水印是周期产生的,紧紧跟随数据中的最新时间戳。该策略实际上使用的就是周期性水印策略,只是将允许的最大延迟时间设置为 0,即在周期性水印策略的基础上去掉了允许的最大延迟时间。WatermarkStrategy 接口中已经内置了用于创建单调递增水印策略的静态方法 forMonotonousTimestamps()。
3、无水印水印策略
该策略创建不生成任何水印的水印策略。该策略在纯基于处理时间的流处理的场景中可能很有用。WatermarkStrategy.noWatermarks()。
4、自定义水印策略
Flink 内置的水印策略可以满足大部分应用场景,如果自定义水印策略需要实现WatermarkStrategy
接口。
6、案例及代码
1、水印例子
比如,在控制台输入数据的事件时间和数据
,通过自定义的水印策略,允许延迟 2S 的数据进入窗口计算。
代码如下:
运行结果如下图:
具体代码地址:
2、延迟数据和侧道输出
继续使用上面的例子,如果数据再水印之外,又延迟到达,再通过侧道输出出去。
代码如下:
运行结果如下图:
具体代码地址:
总结:本文主要讲了 Flink Watermark 水印的概念和使用。
评论