时间语义
“时间”在我们日常的开发学习过程中是特别常见的一个名词,例如:Java 中的日期处理类、获取系统的当前时间、毫秒级的时间戳等等。接下来让我们来看看在 Flink 框架中,对时间不同的概念。Flink 框架中有三个时间的语义:事件时间(Event Time )、摄入时间(Ingestion Time)、系统处理时间(Processing Time)。
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time:数据进入 Flink 的时间。
Processing Time:每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
在 Flink 流处理真实场景中,大部分的业务需求都会使用事件时间语义,但还是以具体的业务需求择选不同的时间语义。Flink 引入 EventTime 方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
复制代码
当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子.
Watermark(水位线)
在 Flink 数据处理过程中,数据从产生到计算到输出结果,是需要一个过程时间,在正常的情况下数据往往都是按照事件产生的时间顺序进行的,由于网络、分布式部署等原因会导致数据产生乱序问题,相当于 Flink 接收到的数据的先后顺序不是按照时间的事件时间顺序排列进行的。乱序数据会让窗口计算不准确.。如何避免这个问题呢?
如果一个数据的时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
Watermark 概念
Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。
当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
举个栗子:如下图为一个乱序的 数据,将 Watermark 设置为 2
如上图,将最大的延时时间设置为 2 秒,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。
Watermark 的特点
Watermark 的使用
对于排序好的数据,不需要延迟触发,只需要指定时间戳即可。
代码如下:
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 升序数据设置事件时间和watermark
// 无需设置延时时间 把当前的getTimestamp提取出当成当前的事件时间
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
@Override
public long extractAscendingTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
复制代码
处理乱序数据需要制定对应的延时时间。
代码如下:
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 乱序数据设置事件时间和watermark
// 需要设置延时时间
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
复制代码
TimestampAssigner
Flink 日供了 TimestampAssigner 接口,我们可以自定义去实现从数据中抽取时间戳的规则以及生成 Watermark 的规则。有如下两种类型:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100);
复制代码
总结
在 flink 开发过程中,Watermark 的使用由开发人员生成。
若 watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
若 watermark 到达得太早,则可能收到错误结果,但 Flink 对延时数据的处理机制可以友好解决。
Flink 如何解决数据的乱序问题,提供了三种处理机制:使用 Watermark、设置窗口延时 (allowedLateness)、设置侧流(sideOutputLateData0
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 乱序数据设置事件时间和watermark
// 需要设置延时时间
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
OutputTag<SensorReading> tag = new OutputTag<>("late");
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
// 设置窗口延时
.allowedLateness(Time.seconds(1))
// 将窗口关闭后的数据输出到侧流
.sideOutputLateData(tag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(tag).print("late");
env.execute();
}
}
复制代码
评论