写点什么

10 分钟了解 Flink 窗口计算

  • 2023-08-05
    江苏
  • 本文字数:8472 字

    阅读完需:约 28 分钟

10分钟了解Flink窗口计算

在有状态流处理中,时间在计算中起着重要的作用。比如,当进行时间序列分析、基于特定时间段进行聚合,或者进行事件时间去处理数据时,都与时间相关。接下来将重点介绍在使用实时 Flink 应用程序时应该考虑的跟时间相关的一些元素。文中的示例使用到 netcat 工具。


窗口计算有如下几个核心概念:

1、时间概念

1.1、事件时间(Event Time)

事件时间是指每个事件或元素在其生产设备上产生的时间。该时间通常在它们进入 Flink 之前就已经嵌入在事件中,并且可以从每个事件中提取事件时间戳。


有了事件时间,基于窗口的聚合(例如,每分钟的事件数量)只是事件时间列上的一种特殊的分组和聚合——每个时间窗口是一个组,每一行数据可以属于多个窗口/组(针对滑动窗口,多个窗口可能有重合的数据)。

1.2、处理时间(Processing Time)

处理时间是指正在执行相应 Flink 操作的机器的系统时间。


当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应操作的计算机的系统时间。在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到记录到达系统(例如从消息队列)的速度以及记录在系统内部操作之间流动的速度的影响。

1.3、示例

例如,一个用户在 10:00 使用 APP 下了一笔订单,系统产生了一条日志并记录日志的产生时间为 10:00。接下来这条数据被发送到 Kafka,然后使用 Flink 进行处理,当数据到达 Flink 时,Flink 的系统时间为 10:02。10:02 指的是处理时间,而 10:00 则是事件时间。如果想要准确的获得这个 APP 每分钟生成的事件数量,那么可能需要使用数据生成的时间(嵌入数据中的事件时间),而不是 Flink 的处理时间。


事件时间和处理时间的时间差,如下图:


2、窗口分类

2.1、window 和 windowAll

窗口是处理无限流的核心。窗口将流分成有限大小的“桶”,我们可以在其上应用算子计算。


Flink 可以使用 window()和 windowAll()定义一个窗口,二者都需要传入一个窗口分配器 WindowAssigner,WindowAssigner 负责分配事件到相应的窗口。


window()作用于 KeyedStream 上,即 keyBy()之后,这样可以多任务并行计算,对窗口内的多组数据分别进行聚合。


windowAll()作用于非 KeyedStream 上(通常指 DataStream),由于所有元素都必须通过相同的算子实例。


应用举例:假设要计算 24 小时内每个用户的订单平均消费额,就需要使用 window()定义窗口;如果要计算 24 小时内的所有订单平均消费额,则需要使用 windowAll()定义窗口。


Flink 窗口程序的大致骨架结构


KeyedStream:



非 KeyedStream:



代码示例:


// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据源DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");// 3. 数据转换DataStream<Tuple2<String, Integer>> wordCountStream = textStream        .assignTimestampsAndWatermarks(MyWatermark.create())        // 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                     @Override                     public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                         for (String word : value.split("\\s")) {                             out.collect(new Tuple2<>(word, 1));                         }                     }                 }        )        // 对单词进行分组        .keyBy(value -> value.f0)        .window(TumblingEventTimeWindows.of(Time.seconds(5)))        // 对某个组里的单词的数量进行滚动相加统计        .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));// 4. 数据输出。字节输出到控制台wordCountStream.print("WindowWordCount01 ======= ").setParallelism(1);// 5. 启动任务env.execute(WindowWordCount01.class.getSimpleName());
复制代码


DataStream<Tuple2<String, Integer>> wordCountStream = textStream                .assignTimestampsAndWatermarks(MyWatermark.create())                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                             @Override                             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                                 for (String word : value.split("\\s")) {                                     out.collect(new Tuple2<>(word, 1));                                 }                             }                         }                )                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))                .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
复制代码

2.2、窗口可以分类

Flink 的窗口可以分为滚动窗口、滑动窗口、会话窗口、全局窗口。


滚动窗口:滚动窗口分配器将每个元素分配给指定大小的窗口,滚动窗口具有固定的大小,并且不重叠。例如,指定大小为 5 分钟的滚动窗口,则每隔 5 分钟将启动一个新窗口。如下图:



代码示例:


DataStream<Tuple2<String, Integer>> wordCountStream = textStream                .assignTimestampsAndWatermarks(MyWatermark.create())                // 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                             @Override                             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                                 for (String word : value.split("\\s")) {                                     out.collect(new Tuple2<>(word, 1));                                 }                             }                         }                )                // 对单词进行分组                .keyBy(value -> value.f0)                // 基于事件时间的滚动窗口//                .window(TumblingEventTimeWindows.of(Time.seconds(5)))                // 基于处理时间的滚动窗口//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))                // 对某个组里的单词的数量进行滚动相加统计                .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
复制代码


滑动窗口:滑动窗口分配器将元素分配给固定长度的窗口。与滚动窗口类似,滑动窗口的大小由指定参数配置,但是增加了滑动步长(Slide)参数,相当于以指定步长不断向前滑动。


使用滑动窗口时,需要设置窗口大小和滑动步长两个参数。因此,如果滑动窗口的步长小于窗口大小,则滑动窗口可以重叠。这种情况下,元素被分配给多个窗口。如果滑动窗口的步长大于窗口大小时,有些元素可能会丢失。


例如,每隔 5 分钟需要对最近 10 分钟的数据进行计算,就可以设置窗口大小为 10 分钟,滑动步长为 5 分钟。这样,每隔 5 分钟就会得到一个窗口,其中包含最近 10 分钟内到达的数据。如下图:



代码示例:


DataStream<Tuple2<String, Integer>> wordCountStream = textStream                .assignTimestampsAndWatermarks(MyWatermark.create())                // 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                             @Override                             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                                 for (String word : value.split("\\s")) {                                     out.collect(new Tuple2<>(word, 1));                                 }                             }                         }                )                // 对单词进行分组                .keyBy(value -> value.f0)                // 基于事件时间的滑动窗口//                .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))                // 基于处理时间的滑动窗口//                .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))                // 对某个组里的单词的数量进行滚动相加统计                .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
复制代码


会话窗口:会话窗口分配器按活动会话对事件进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间。相反,当会话窗口在一定时间段内未收到事件时,即发生不活动的间隙时,窗口将关闭。会话窗口,我在实际工作中没有使用过,所以不做过多介绍,感兴趣的朋友可以自行查阅资料。


全局窗口:全局窗口分配器将所有具有相同 Key 的事件分配给同一个全局窗口。由于全局窗口没有自然的窗口结束时间,因此使用全局窗口需要指定触发器。触发器在文章后面有介绍,会结合全局窗口做代码示例。

3、窗口函数

事件被窗口分配器分配到窗口后,接下来需要指定想要在每个窗口上执行的计算函数(即窗口函数),以便对窗口内的数据进行处理。

3.1、增量计算和全量计算

Flink 计算窗口数据分为增量计算全量计算,这一点非常重要,希望你牢记。Flink 提供的窗口函数有 ReduceFunctionAggregateFunctionProcessWindowFunction,也需要你牢记。


增量计算:基于中间状态对窗口中的元素进行递增聚合,例如,窗口每流入一个新元素,新元素就会与中间数据进行合并,生成新的中间数据,再保存到窗口中。比如 ReduceFunction、AggregateFunction。


全量计算:需要依赖窗口中的所有数据或需要获取窗口中的状态数据和窗口元数据(窗口开始时间、窗口结束时间等)。例如对整个窗口数据排序取 TopN,比如 ProcessWindowFunction。

3.2、函数功能

ReduceFunction:ReduceFunction 指定如何聚合输入中的两个元素以产生相同类型的输出元素。


AggregateFunction:AggregateFunction 是聚合函数的基本接口,也是 ReduceFunction 的通用版本。与 ReduceFunction 相同,Flink 将在窗口输入元素到达时对其进行增量聚合。


ProcessWindowFunction:使用 ProcessWindowFunction 可以获得一个包含窗口所有元素的可迭代对象(Iterable),以及一个可以访问时间和状态信息的上下文对象(Context),这使得它比其他窗口函数提供了更多的灵活性。这种灵活性是以性能和资源消耗为代价的,因为元素不能递增聚合,而是需要在调用处理函数之前在内部缓冲窗口中的所有元素。因此,使用 ProcessWindowFunction 需要注意数据量不应太大,否则会造成内存溢出。例如,使用 ProcessWindowFunction 来处理简单的聚合(例如计算元素数量)是非常低效的。


带增量聚合的 processWindowFunction:由于 ProcessWindowFunction 是全量计算函数,如果既要获得窗口信息又要进行增量聚合,则可以将 ProcessWindowFunction 与 ReduceFunction 或 AggregateFunction 结合使用。ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 组合在一起,以便在元素到达窗口时增量地聚合。当窗口关闭时,ProcessWindowFunction 将提供聚合的结果。


Flink 算子 apply 和 process 都是全量聚合,即会等窗口的所有元素到达后做全量计算。


Flink 算子 reduce 是增量聚合,即每来一个元素就聚合计算一次。


process 和 processWindowFunction 使用,比如统计 5 秒钟内每个单词的次数,代码示例:


DataStream<Tuple2<String, Integer>> wordCountStream = textStream                .assignTimestampsAndWatermarks(MyWatermark.create())                // 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                             @Override                             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                                 for (String word : value.split("\\s")) {                                     out.collect(new Tuple2<>(word, 1));                                 }                             }                         }                )                // 对单词进行分组                .keyBy(value -> value.f0)                .window(TumblingEventTimeWindows.of(Time.seconds(5)))                // 清除器,用于清除超过evictionSec前的数据。防止整个窗口的数据量过大                // 对某个组里的单词的数量进行滚动相加统计                .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {                    @Override                    public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {                        int sum = 0;                        for (Tuple2<String, Integer> element : elements) {                            sum += element.f1;                        }                        out.collect(new Tuple2<>(key, sum));                    }                });
复制代码


继续使用上面的例子,统计窗口内,所有输入的单词的数量,示例如下:


DataStream<Tuple2<String, Integer>> wordCountStream = textStream                .assignTimestampsAndWatermarks(MyWatermark.create())                // 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                             @Override                             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                                 for (String word : value.split("\\s")) {                                     out.collect(new Tuple2<>(word, 1));                                 }                             }                         }                )                // 对单词进行分组                .keyBy(value -> value.f0)                .window(TumblingEventTimeWindows.of(Time.seconds(10)))                // 对某个组里的单词的数量进行增量相加                .reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1))                // 在进行总数合并                .process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {                    @Override                    public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {                        value.f1 += value.f1;                        out.collect(new Tuple2<>("合计", value.f1));                    }                });
复制代码

4、触发器

触发器(Trigger)决定了一个窗口的数据何时被窗口函数处理。每个窗口都有一个默认的触发器。如果默认的触发器不满足你的需求,可以使用 trigger()指定一个自定义触发器。抽象类 Trigger 定义了触发器的基本方法,允许触发器对不同的事件做出反应。


主要方法


onElement:每次向窗口增加一个元素时都会触发该方法。


onEventTime:当设置的事件时间计时器被触发时调用该方法。


onProcessingTime:当设置的处理时间计时器被触发时调用该方法。


onMerge:当多个窗口合并为一个窗口时调用该方法。当两个触发器对应的窗口合并时,会合并它们的状态。


clear:在删除相应窗口时执行所需的任何操作,主要用于清除触发器可能为给定窗口保留的任何状态。


比如自定义一个触发器,在全局窗口内,对于统计输出的单词是1的的场景进行触发计算,代码示例:


private static class MyTrigger<T, W extends Window> extends Trigger<T, W> {    private final ValueStateDescriptor<T> stateDesc;
private MyTrigger(TypeSerializer<T> stateSerializer) { stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer); }
@Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { Tuple2<String, Integer> elementValue = (Tuple2<String, Integer>) element; ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc); if (lastElementState.value() == null) { lastElementState.update(element); return TriggerResult.CONTINUE; } // 此处状态描述器ValueState可以不使用 Tuple2<String, Integer> lastValue = (Tuple2<String, Integer>) lastElementState.value(); if (elementValue.f0.equals("1")) { lastElementState.update(element); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
@Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; }
@Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; }
@Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); }}
复制代码

5、清除器

除了触发器之外,Flink 的窗口还允许使用 evictor()方法指定一个可选的清除器(Evictor)。使用清除器允许在触发器触发后,窗口函数执行之前或之后,从窗口中删除元素。清除器 Evictor 是一个接口,有 evictBefore()和 evictAfter()两个方法。


接着使用上面的例子,全局窗口,每隔 30S 清理一次窗口内的数据,代码示例:


DataStream<Tuple2<String, Integer>> wordCountStream = textStream.assignTimestampsAndWatermarks(MyWatermark.create())// 对数据源的单词进行拆分,每个单词记为1,然后通过out.collect将数据发射到下游算子.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {             @Override             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                 for (String word : value.split("\\s")) {                     out.collect(new Tuple2<>(word, 1));                 }             }         })// 对单词进行分组.keyBy(value -> value.f0).window(GlobalWindows.create())// 清除器,用于清除超过evictionSec前的数据。防止整个窗口的数据量过大.evictor(TimeEvictor.of(Time.of(30, TimeUnit.SECONDS))).trigger(new WindowWordCount02.MyTrigger(textStream.getType().createSerializer(env.getConfig())))// 对某个组里的单词的数量进行滚动相加统计.reduce(new ReduceFunction<Tuple2<String, Integer>>() {    @Override    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> a, Tuple2<String, Integer> b) throws Exception {        return new Tuple2<>(a.f0, a.f1 + b.f1);    }});
复制代码


这个例子执行时,可以看到,非1的字符串不会触发窗口计算,同时,清除器会清理 30 秒之前的数据,比如,某一次输入后,到了 30 秒之后继续输出,会重新开始统计。某一次输入后,没到 30S 就继续输入,会把最近的 30 秒内的窗口所有数据合并统计。比如下图:



总结:本文主要讲了窗口的时间概念、窗口分配、窗口函数的使用,以及触发器、清除器的使用。具体代码详见地址:https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo003/WindowWordCount02.java



用户头像

这里可以找到我 mangod.top 2018-09-11 加入

13年IT行业经验,做过架构创过业,一起交流学习。专注于软件开发、云原生、大数据领域。关注我职业发展不焦虑。

评论

发布
暂无评论
10分钟了解Flink窗口计算_不焦躁的程序员_InfoQ写作社区