概述
在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。
Flink 中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。
窗口类型
Flink 中的窗口类型有两种:时间窗口(Time Window)
、计数窗口(Count Window)
。时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。
滚动窗口(Tumbling Windows)
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。
换句话说:如果制定了一个 30 分钟时间间隔的滚动窗口,然后就会将无界限的数据以 30 分钟为一个窗口期进行切割成有限的数据集合。适用场景:做统计计算。做每个时间段的聚合计算。
滑动窗口(Sliding Windows)
说明:滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
适用场景:(求某接口最近 5min 的失败率来决定是否要报警)对最近一个时间段内的统计。
会话窗口(Session Windows)
会话敞口只存在于时间窗口,计数窗口无会话窗口。
特点是时间无对齐
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口。
当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去
Window API 使用
窗口分配器 window()
在 flink 中可以用 .window()
来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window ()
方法必须在 keyBy
之后才能用。window()
方法接收的输入参数是一个 WindowAssigner
WindowAssigner
负责将每条输入的数据分发到正确的 window
中。Flink 提供了通用的 WindowAssigner:滚动窗口(tumbling window)、滑动窗口(sliding window)、 会话窗口(session window)、全局窗口(global window)
代码如下:
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//设置一个15秒的一个滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
//会话窗口
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
//滑动窗口
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
复制代码
Flink 提供了更加简单的 .timeWindow
和 .countWindow
方法,用于定义时间窗口和计数窗口.
TimeWindow
TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个 window 里面的所有数据进行计算。时间间隔可以通过 Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等其中的一个来指定。
CountWindow
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
创建不同类型的窗口
滚动时间窗口(tumbling time window)
.timeWindow(Time.seconds(15))
复制代码
滑动时间窗口(sliding time window)
==下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。==
.timeWindow(Time.seconds(15),Time.seconds(5))
复制代码
会话窗口(session window)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
复制代码
滚动计数窗口(tumbling count window)
==默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。==
滑动计数窗口(sliding count window)
==下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。==
窗口函数
Flink 中定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:增量聚合函数、全窗口函数。
案例代码:
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//对窗口进行聚合操作 增量窗口操作
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
//创建累加器
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}
复制代码
本地测试:结果输出成功。
案例代码:
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//全量窗口函数
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}
复制代码
本地运行测试:结果输出成果
如何在 winodws 操作系统下使用 nc 命令进行代码测试:在Windows操作系统中怎样使用nc命令
其他 API
触发器:.trigger()
定义 window 什么时候关闭,触发计算并输出结果
移除器:.evitor()
定义移除某些数据的逻辑
.allowedLateness()
允许处理迟到的数据
.sideOutputLateData()
将迟到的数据放入侧输出流
.getSideOutput()
获取侧输出流
案例代码:
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// 开计数窗口测试
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
//其他可选API 对迟到数据的处理方式
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
//输出到测流
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}
复制代码
此图来源于网络,window API 总览
注意点:
评论