写点什么

大数据培训:Flink 窗口的开始时间的计算

  • 2022 年 2 月 17 日
  • 本文字数:1571 字

    阅读完需:约 5 分钟

我还记得的在我刚学习 flink 的时候,B 站的老师说过,Flink 窗口的开始时间和结束时间和你想的不一样。那个时候我好像记得老师说过,flink 的窗口大小会根据你的时间单位来进行修正。

然后在现如今,很多人还是不是很了解窗口机制,以及 watermark。更别提什么窗口什么时候,什么时候结束。所以呢,今天从大数据培训源码角度给大家普及一下窗口什么时候开始,什么时候结束。

我们可以来编写一个简单的代码,来看一下效果,我习惯用 java 来写 flink,所以也就使用 java 了。

@Override

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {

if (timestamp > Long.MIN_VALUE) {

List<TimeWindow> windows = new ArrayList<>((int) (size / slide));

//获取窗口开始时间

long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);

for (long start = lastStart;

start > timestamp - size;

start -= slide) {

windows.add(new TimeWindow(start, start + size));

}

return windows;

} else {

throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +

"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +

"'DataStream.assignTimestampsAndWatermarks(...)'?");

}

}

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {

//窗口开始计算的时间

return timestamp - (timestamp - offset + windowSize) % windowSize;

}

我们可以看出来窗口开始时间, 是取模过后的时间,我们来简单的分析一番。

假如我们第一条数据的时间戳是 1000,offset 暂时不需要管,因为他是时间的偏移量例如,东八区什么的。我们假如窗口大小是 5s,

那么接下来的公式计算也就是 1000 - (1000 - 0 + 5000)% 5000,那么我们可以计算出来的结果就是 0,也就是说,窗口开始的时间是 0.更大的时间窗口大小,各位大佬可以下面自己算一下。

也就是说开始时间是 0,结束的时间窗口也就是 4999,因为到 5000 的时候就触发计算了。那么我们接下来就进行验证一番和我们分析的是否一致。

接下来我们写一个简单的 wordcount,因为在多并行度下,不是很好分析,我们设置为单并行读。如果有对 watermark 还不是很理解的大佬,可以看我的这篇文章,https://blog.csdn.net/weixin_43704599/article/details/117411252

/**

* @author :qingzhi.wu

* @date :2022/1/18 8:36 下午

*/

public class WindowSizeTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<String> source1 = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.of(0, TimeUnit.MILLISECONDS)) {

@Override

public long extractTimestamp(String s) {

return Long.parseLong(s.split(",")[0]);

}

});

SingleOutputStreamOperator<Tuple2<String, Integer>> map = source1.map(new MapFunction<String, Tuple2<String, Integer>>() {

@Override

public Tuple2<String, Integer> map(String s) throws Exception {

return Tuple2.of(s.split(",")[1],1);

}

});

WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = map.keyBy(0).window(TumblingEventTimeWindows.of(Time.of(5000, TimeUnit.MILLISECONDS)));

window.sum(1).print();

env.execute();

}

}


接下来我们,就看看我们分析的是否正确



很明显是正确的。那么一天的窗口大小你会计算吗?

文章来源于好胖子的大数据之路

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据培训:Flink窗口的开始时间的计算