写点什么

配置时间特性

用户头像
小知识点
关注
发布于: 2020 年 09 月 16 日

ProcessingTime



  • 处理时间窗口基于机器时间触发

  • 在窗口算子中使用处理时间会导致不确定性,应为窗口取决于元素到达的速率

  • 由于处理任务无需等待水位线来驱动事件时间前进,所以可以提供地延迟



EventTime



  • 指定算子根据数据自身包含的信息决定当前时间

  • 每一个事件时间都有一个时间戳,系统的逻辑时间由水位线来定义

  • 只有依靠水位线声明某个时间间隔内所有时间戳都已收到时,事件时间窗口才出发

  • 即使乱序,事件时间窗口也会计算出正确结果

  • 窗口结果不会取决于数据流的读取和处理速度

  • 当使用EventTime的同时,可以使用处理时间窗口和计时器



IngetionTime



  • 指定每个接收记录都把数据源算子的处理时间作为事件时间的时间戳,并自动生成水位线

  • 时间进入流处理引擎的时间

  • EventTime相比,价值不大,因为性能和EventTime类似,但是无法提供确定的结果



代码示例

def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



源码

是一个枚举类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.flink.streaming.api;

import org.apache.flink.annotation.PublicEvolving;

@PublicEvolving
public enum TimeCharacteristic {
ProcessingTime,
IngestionTime,
EventTime;

private TimeCharacteristic() {
}
}




@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
//校验是否为空
this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
this.getConfig().setAutoWatermarkInterval(0L);
} else {
this.getConfig().setAutoWatermarkInterval(200L);
}

}



@Internal
public final class Preconditions {
public static <T> T checkNotNull(T reference) {
if (reference == null) {
throw new NullPointerException();
} else {
return reference;
}
}



用户头像

小知识点

关注

奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入

还未添加个人简介

评论

发布
暂无评论
配置时间特性