本文已收录至 GitHub,推荐阅读 👉 Java随想录
微信公众号:Java 随想录
原创不易,注重版权。转载请注明原作者和原文链接
承接上篇未完待续的话题,我们一起继续 Flink 的深入探讨
Flink State 状态
Flink 是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到 TaskManager 的堆内存中。
但是当 Task 挂掉,那么这个 Task 所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。
想要保证 At -least-once 和 Exactly-once,则需要把数据状态持久化到更安全的存储介质中,Flink 提供了堆内内存、堆外内存、HDFS、RocksDB 等存储介质。
先来看下 Flink 提供的状态有哪些,Flink 中状态可以分为两种类型:
Keyed State
基于 KeyedStream 上的状态,这个状态是跟特定的 Key 绑定,KeyedStream 流上的每一个 Key 都对应一个 State,每一个 Operator 可以启动多个 Thread 处理,但是相同 Key 的数据只能由同一个 Thread 处理,因此一个 Keyed 状态只能存在于某一个 Thread 中,一个 Thread 会有多个 Keyed State。
Non-Keyed State(Operator State)
Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition, offset)映射。
Flink 针对 Keyed State 提供了以下可以保存 State 的数据结构:
ValueState:类型为 T 的单值状态,这个状态与对应的 Key 绑定,最简单的状态,通过 update 更新值,通过 value 获取状态值。
ListState:Key 上的状态值为一个列表,这个列表可以通过add()
方法往列表中添加值,也可以通过get()
方法返回一个 Iterable 来遍历状态值。
ReducingState:每次调用add()
方法添加值的时候,会调用用户传入的reduceFunction
,最后合并到一个单一的状态值。
MapState:状态值为一个 Map,用户通过put()
或putAll()
方法添加元素,get(key)通过指定的 key 获取 value,使用entries()
、keys()
、values()
检索。
AggregatingState:保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState
相反的是, 聚合类型可能与添加到状态的元素的类型不同。使用 add(IN)
添加的元素会调用用户指定的 AggregateFunction
进行聚合。
FoldingState:已过时,建议使用 AggregatingState 保留一个单值,表示添加到状态的所有值的聚合。 与 ReducingState
相反,聚合类型可能与添加到状态的元素类型不同。 使用add(T)
添加的元素会调用用户指定的 FoldFunction
折叠成聚合值。
案例 1:使用 ValueState 统计每个键的当前计数
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple2.of("user1", 1), Tuple2.of("user2", 1), Tuple2.of("user1", 1), Tuple2.of("user2", 1))
.keyBy(0)
.flatMap(new CountWithKeyedState())
.print();
env.execute("Flink ValueState example");
}
public static class CountWithKeyedState extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> countState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("countState", Integer.class, 0);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer currentCount = countState.value();
currentCount += value.f1;
countState.update(currentCount);
out.collect(Tuple2.of(value.f0, currentCount));
}
}
复制代码
在这段代码中,我们首先创建了一个 StreamExecutionEnvironment
,然后产生一些元素,每个元素都是指定用户的一个事件。keyBy(0)
表示我们以元组的第一个字段(即用户 ID)为键进行分组。
然后,我们使用 flatMap
算子应用了 CountWithKeyedState
函数。这个函数使用了 Flink 的 ValueState 来存储和更新每个键的当前计数。
在 open
方法中,我们定义了一个名为 "countState" 的 ValueState,并把它初始化为 0。在 flatMap
方法中,我们从 ValueState 中获取当前计数,增加输入元素的值,然后更新 ValueState,并发出带有当前总数的元组。
注意:在真实的生产环境中,你可能需要从数据源(如 Kafka, HDFS 等)读取数据,而不是使用 fromElements
方法
案例 2:使用 MapState 统计单词出现次数
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private transient MapState<String, Integer> wordState;
@Override
public void open(Configuration parameters){
MapStateDescriptor<String, Integer> descriptor =
new MapStateDescriptor<>("wordCount", String.class, Integer.class);
wordState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer count = wordState.get(value.f0);
if (count == null) {
count = 0;
}
count += value.f1;
wordState.put(value.f0, count);
out.collect(Tuple2.of(value.f0, count));
}
})
.print();
env.execute("Word Count with MapState");
}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
在这个例子中,我们首先通过 socketTextStream
方法从本地的 socket 获取输入数据流。然后我们用 flatMap
操作将每行输入分解为单个单词,并且为每个单词赋予基础计数值(基数)1。
我们创建一个使用 RichFlatMapFunction
的 operator,它可以访问 MapState
。在 open()
方法中,我们定义了 MapStateDescriptor
,然后用这个 descriptor
创建 MapState
。
在 flatMap()
函数中,我们获取当前单词的计数值,如果不存在则设置为 0。然后我们增加计数值,更新 MapState,并且输出当前单词和它的出现次数。
案例 3:使用 ReducingState 统计输入流中每个键的最大值
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
Tuple2.of("A", 6),
Tuple2.of("B", 5),
Tuple2.of("C", 4),
Tuple2.of("A", 3),
Tuple2.of("B", 2),
Tuple2.of("C", 1)
);
dataStream.keyBy(0).flatMap(new MaxValueReducer()).print();
env.execute("ReducingState Example");
}
public static class MaxValueReducer extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ReducingState<Integer> maxState;
@Override
public void open(Configuration config) {
ReducingStateDescriptor<Integer> descriptor = new ReducingStateDescriptor<>(
"maxValue", // state的名字
Math::max, // ReduceFunction,这里取两者的最大值
TypeInformation.of(Integer.class)); // 类型信息
maxState = getRuntimeContext().getReducingState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> out) throws Exception {
maxState.add(input.f1); // 更新state的值
out.collect(Tuple2.of(input.f0, maxState.get())); // 输出当前key的最大值
}
}
复制代码
在上述代码中,我们首先创建了一个新的MaxValueReducer
类,该类扩展了RichFlatMapFunction
。然后定义了一个ReducingState
变量,用于在每个 key 上维护最大值。在open()
方法中,我们初始化了这个状态变量。在flatMap()
方法中,我们简单地将新的值添加到状态中,并输出当前 key 的最大值。
输出如下:
7> (A,6)
7> (A,6)
2> (B,5)
2> (C,4)
2> (B,5)
2> (C,4)
复制代码
案例 4:使用 AggregatingState 统计输入流中每个键的平均值
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> input = env.fromElements(
Tuple2.of("A", 6),
Tuple2.of("B", 5),
Tuple2.of("C", 4),
Tuple2.of("A", 3),
Tuple2.of("B", 2),
Tuple2.of("C", 1)
);
input.keyBy(x -> x.f0)
.process(new AggregatingProcessFunction())
.print();
env.execute();
}
public static class AverageAggregate implements AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
public static class AggregatingProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Double>> {
private AggregatingState<Integer, Double> avgState;
@Override
public void open(Configuration parameters) {
AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> descriptor =
new AggregatingStateDescriptor<>("average", new AverageAggregate(), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {
}));
avgState = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx,
Collector<Tuple2<String, Double>> out) throws Exception {
avgState.add(value.f1);
out.collect(new Tuple2<>(value.f0, avgState.get()));
}
}
复制代码
输入如下:
7> (A,6.0)
2> (B,5.0)
2> (C,4.0)
7> (A,4.5)
2> (B,3.5)
2> (C,2.5)
复制代码
这段代码主要是计算每个键对应的值的平均数。代码中定义了:AverageAggregate
和AggregatingProcessFunction
。
AverageAggregate
类实现了AggregateFunction
接口,用于计算平均值:
createAccumulator
方法返回一个新的累加器,这里是一个包含两个整数的元组,表示当前的总数和元素的数量。
add
方法向累加器添加一个元素的值,将其添加到总数中,并增加元素数量。
getResult
方法根据累加器计算平均值。
merge
方法合并两个累加器,将他们的总数和元素数量相加。
AggregatingProcessFunction
类扩展了KeyedProcessFunction
,在接收到一个元素时添加到状态中的平均值,并输出当前的平均值:
以上案例代码都经过本地运行和测试,建议大家自行运行以便更深入地理解。
CheckPoint & SavePoint
有状态流应用中的检查点(CheckPoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)
简单来讲,就是一次「存盘」,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态。
如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同「读档」一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的enableCheckpointing()
方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
复制代码
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了「保存点(SavePoint)」的功能。
保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照。
保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个「自动存盘」的功能。而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是「手动存盘」。
因此两者尽管原理一致,但用途就有所差别了。
检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复
检查点具体的持久化存储位置,取决于「检查点存储(CheckPointStorage)」的设置。
默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink 也提供了在其他存储位置进行保存的接口,这就是「 CheckPointStorage」。
具体可以通过调用检查点配置的 setCheckpointStorage()
来配置,需要传入一个 CheckPointStorage 的实现类。例如:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置检查点时间间隔为1000ms
env.enableCheckpointing(1000);
// 设置checkpoint存储路径, 注意路径需要是可访问且有写权限的HDFS或本地路径
URI checkpointPath = URI.create("hdfs://localhost:9000/flink-checkpoints");
FileSystemCheckpointStorage storage = new FileSystemCheckpointStorage(checkpointPath, 10000);
// 应用配置
env.getCheckpointConfig().setCheckpointStorage(storage);
// 设置重启策略,这里我们设置为固定延时无限重启
//Flink的重启策略是用来决定如何处理作业执行过程中出现的失败情况的。如果Flink作业在运行时出错,比如由于代码错误、硬件故障或 网络问题等,那么重启策略就会决定是否和如何重启作业。
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
// 尝试重启次数
3,
//每次尝试重启的固定延迟时间为 10 秒
org.apache.flink.api.common.time.Time.of(10, java.util.concurrent.TimeUnit.SECONDS)
));
env.execute("Flink Checkpoint Example");
}
复制代码
Flink 主要提供了两种 CheckPointStorage:
对于实际生产应用,我们一般会将 CheckPointStorage 配置为高可用的分布式文件系统(HDFS,S3 等)。
CheckPoint 原理
Flink 会在输入的数据集上间隔性地生成 CheckPoint Barrier,通过栅栏(Barrier)将间隔时间段内的数据划分到相应的 CheckPoint 中。
当程序出现异常时,Operator 就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。
例如在 Kafka Consumer 算子中维护 offset 状态,当系统出现问题无法从 Kafka 中消费数据时,可以将 offset 记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况 Flink 不开启检查点,用户需要在程序中通过调用方法配置来开启检查点,另外还可以调整其他相关参数
env.enableCheckpointing(1000)
复制代码
Exactly-once 和 At-least-once 语义选择
选择 Exactly-once 语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink 的性能也相对较弱。
而 At-least-once 语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。如下通过setCheckpointingMode()
方法来设定语义模式,默认情况下使用的是 Exactly-once 模式。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
复制代码
env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000);
复制代码
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(600)
复制代码
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1)
复制代码
任务取消后,是否删除 CheckPoint 中保存的数据
RETAIN_ON_CANCELLATION
:表示一旦 Flink 处理程序被 cancel 后,会保留 CheckPoint 数据,以便根据实际需要恢复到指定的 CheckPoint。
DELETE_ON_CANCELLATION
:表示一旦 Flink 处理程序被 cancel 后,会删除 CheckPoint 数据,只有 Job 执行失败的时候才会保存 CheckPoint。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
复制代码
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1)
复制代码
SavePoint 原理
SavePoint 底层实现其实也是使用 CheckPoint 的机制。
SavePoint 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
要使用 SavePoint,需要按照以下步骤进行:
配置状态后端: 在 Flink 中,状态可以保存在不同的后端存储中,例如内存、文件系统或分布式存储系统(如 HDFS)。要启用 SavePoint,需要在 Flink 配置文件中配置合适的状态后端。
通常,使用分布式存储系统作为状态后端是比较常见的做法,因为它可以提供更好的可靠性和容错性。
生成 SavePoint: 在 Flink 应用程序运行时,可以通过以下方式手动触发生成 SavePoint:
bin/flink savepoint <jobID> [targetDirectory]
复制代码
其中,<jobID>
是要保存状态的 Flink 作业的 Job ID,[targetDirectory]
是可选的目标目录,用于保存 SavePoint 数据。如果没有提供targetDirectory
,SavePoint 将会保存到 Flink 配置中所配置的状态后端中。
恢复 SavePoint: 要恢复到 SavePoint 状态,可以通过以下方式提交作业:
bin/flink run -s :savepointPath [:runArgs]
复制代码
其中,savepointPath
是之前生成的 SavePoint 的路径,runArgs
是提交作业时的其他参数。
确保应用程序状态的兼容性: 在使用 SavePoint 时,应用程序的状态结构和代码必须与生成 SavePoint 的版本保持兼容。这意味着在更新应用程序代码后,可能需要做一些额外的工作来保证状态的向后兼容性,以便能够成功恢复到旧的 SavePoint。
StateBackend 状态后端
在 Flink 中提供了 StateBackend 来存储和管理状态数据。
Flink 一共实现了三种类型的状态管理器:MemoryStateBackend
、FsStateBackend
、RocksDBStateBackend
。
MemoryStateBackend
基于内存的状态管理器,将状态数据全部存储在 JVM 堆内存中。
基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。
同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用 MemoryStateBackend。
MemoryStateBackend 是 Flink 的默认状态后端管理器
env.setStateBackend(new MemoryStateBackend(100*1024*1024));
复制代码
注意:聚合类算子的状态会同步到 JobManager 内存中,因此对于聚合类算子比较多的应用会对 JobManager 的内存造成一定的压力,进而影响集群。
FsStateBackend
和 MemoryStateBackend 有所不同的是,FsStateBackend 是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是 HDFS 分布式文件系统。
env.setStateBackend(new FsStateBackend("path",true));
复制代码
如果 path 是本地文件路径,格式为:file:///
;如果 path 是 HDFS 文件路径,格式为:hdfs://
。
第二个参数代表是否异步保存状态数据到 HDFS,异步方式能够尽可能避免 ChecPoint 的过程中影响流式计算任务。
FsStateBackend 更适合任务量比较大的应用,例如:包含了时间范围非常长的窗口计算,或者状态比较大的场景。
RocksDBStateBackend
RocksDBStateBackend 是 Flink 中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend 需要单独引入相关的依赖包到工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.4</version>
<scope>test</scope>
</dependency>
复制代码
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-backend"));
复制代码
RocksDBStateBackend 采用异步的方式进行状态数据的 Snapshot,任务中的状态数据首先被写入本地 RockDB 中,这样在 RockDB 仅会存储正在进行计算的热数据,而需要进行 CheckPoint 的时候,会把本地的数据直接复制到远端的 FileSystem 中。
与 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因为借助于 RocksDB 在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能就会较弱一些。
RocksDB 克服了 State 受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
集群级配置 StateBackend
全局配置需要修改集群中的配置文件flink-conf.yaml
。
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
复制代码
state.backend: jobmanager
复制代码
#同时操作RocksDB的线程数
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
#RocksDB存储状态数据的本地文件路径
state.backend.rocksdb.localdir: 本地path
复制代码
Window
在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。
所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
窗口实质上是将无界流切割为一系列有界流,采用左开右闭的原则
Flink 中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)
时间窗口中又包含了:滚动时间窗口、滑动时间窗口、会话窗口
计数窗口包含了:滚动计数窗口、滑动计数窗口
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以有不同的功能应用。
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)、全局窗口(Global Window)
滚动窗口
滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠
滚动窗口可以基于时间定义,也可以基于数据个数定义,需要的参数只有窗口大小。
我们可以定义一个大小为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个大小为 10 的滚动计数窗口,就会每 10 个数进行一次统计。
基于时间的滚动窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> randomKeyedStream = env
.fromSequence(1, Long.MAX_VALUE)
// 将每个数映射为一个二元组,第一个元素是随机键,第二个元素是数本身
.map(new MapFunction<Long, Tuple2<Integer, Integer>>() {
private final Random rnd = new Random();
@Override
public Tuple2<Integer, Integer> map(Long value) {
return new Tuple2<>(rnd.nextInt(10), value.intValue());
}
});
// 对流进行滚动窗口操作,窗口大小为5秒
// 应用窗口函数,求每个窗口的和
DataStream<Integer> sum = randomKeyedStream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.f1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key,
TimeWindow window,
Iterable<Tuple2<Integer, Integer>> values,
Collector<Integer> out){
int sum1 = 0;
for (Tuple2<Integer, Integer> val: values) {
sum1 += val.f1;
}
out.collect(sum1);
}
});
sum.print();
env.execute("Tumbling Window Example");
}
复制代码
这个程序的主要功能是从 1 到Long.MAX_VALUE
产生一个序列,并为每个生成的数字创建一个二元组(Tuple2),然后在 5 秒大小的窗口上对二元组进行操作并输出每个窗口中所有值的总和。
详细解释如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: 获取 Flink 的运行环境。
产生一个无限长的序列(从 1 开始到最大的 Long 型数),每个数字都映射成一个二元组,第一个元素(f0)是一个 0-9 的随机整数(作为键用于之后的 keyBy 操作),第二个元素(f1)是数字本身。
使用assignTimestampsAndWatermarks
来定义事件时间和水位线。这里设定了最大延迟时间为 5 秒(forBoundedOutOfOrderness
),并将二元组的第二个元素作为时间戳。
使用keyBy(0)
按照二元组的第一个元素进行分区,这样保证了相同键的元素会被发送到同一个任务中。
定义了一个 5 秒的滚动窗口timeWindow(Time.seconds(5))
。
使用apply
函数应用在每个窗口上,计算每个窗口中所有二元组的第二个元素(f1)的总和,并收集结果。最终,每个窗口计算的总和都会被输出。
sum.print();
: 命令将处理后的数据打印出来。
env.execute("Tumbling Window Example");
: 启动 Flink 任务。
基于计数的滚动窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.countWindow(5) // Count window of 5 elements
.sum(1);
counts.print().setParallelism(1);
env.execute("Window WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
这段程序从本地 9999 端口读取数据流,对每一行的单词进行小写处理和分割,然后在滑动窗口中(大小为 5 个元素)计算出各个单词的出现次数。
滑动窗口
滑动窗口的大小固定,但窗口之间不是首尾相接,会有部分重合。同样,滑动窗口也可以基于时间和计数定义。
滑动窗口的参数有两个:窗口大小和滑动步长
基于时间的滑动窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> processedInput = input.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value){
String[] words = value.split(",");
return new Tuple2<>(words[0], Integer.parseInt(words[1]));
}
});
// 指定窗口类型为滑动窗口,窗口大小为10分钟,滑动步长为5分钟
DataStream<Tuple2<String, Integer>> windowCounts = processedInput
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2){
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
windowCounts.print().setParallelism(1);
env.execute("Time Window Example");
}
复制代码
这段程序从一个套接字端口读取输入数据,将每行输入按照“,”切分并映射为 tuple(字符串,整数)。然后,它按照第一个元素(即字符串)进行分组,并使用滑动窗口(窗口大小为 10 秒,滑动步长为 5 秒)进行聚合 - 在每个窗口内,所有具有相同键的值的整数部分被相加。最终结果会在控制台上打印。
基于计数的滑动窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.countWindow(5, 1)
.sum(1);
counts.print().setParallelism(1);
env.execute("Sliding Window WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
这段代码是实时滑动窗口词频统计程序。它从本地 9999 端口读取数据流,将接收到的每行文本拆分为单词然后输出为(单词,1)的形式,接着按照单词分组,使用大小为 5,步长为 1 的滑动窗口,并对每个窗口中的同一单词出现次数进行求和,最后打印结果。
会话窗口
会话窗口是 Flink 中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠,“会话”终止的标志就是隔一段时间没有数据进来
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> inputStream = env.fromElements(
new Tuple2<>("user1", 1617229200000L),
new Tuple2<>("user1", 1617229205000L),
new Tuple2<>("user2", 1617229210000L),
new Tuple2<>("user1", 1617229215000L),
new Tuple2<>("user2", 1617229220000L)
);
SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = inputStream
.keyBy(value -> value.f0)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.sum(1);
resultStream.print();
env.execute("Session Window Example");
}
复制代码
这段代码从一个数据流中读取用户活动数据(包含用户 ID 和 Unix 时间戳),然后根据用户 ID 将数据进行分组,并应用了一个会话窗口(当用户五分钟内无活动则关闭该用户的窗口)。
然后,它对每个用户在各自窗口内的活动时间戳求和,并打印出结果。最后执行的名为"Session Window Example"的任务即完成了这一流式计算过程。
按键分区窗口和非按键分区窗口
在 Flink 中,数据流可以按键分区(keyed)和非按键分区(non-keyed)。
按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个 worker 实例处理。只有按键分区的数据流才能使用键分区状态和计时器。
非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是在调用窗口算子之前是否有 keyBy 操作。
按键分区窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts =
// 将输入字符串拆分为tuple类型,包含word和数量
text.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) {
return new Tuple2<>(value, 1);
}
})
// 根据元组的第一字段(word)进行分区键
.keyBy(0)
// 定义一个滚动窗口,时间间隔为5秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// 应用reduce函数,累加各个窗口中同一单词的数量
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
counts.print();
env.execute("Window WordCount");
复制代码
这段代码从 localhost 的 9999 端口接收数据流,将输入的每个字符串作为一个单词和数字 1 的 tuple 对象,然后根据单词进行分区,创建一个滚动窗口(间隔为 5 秒),并在每个窗口中对同一单词的数量进行累加统计,最后打印出结果。
非按键分区窗口:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
DataStream<Integer> windowCounts = parsed
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) {
return value1 + value2;
}
});
windowCounts.print().setParallelism(1);
env.execute("Non keyed Window example");
}
复制代码
这段程序从 localhost 的 9999 端口读取数据流,把每条数据转化为整数,然后在 5 秒的滚动窗口内将所有的整数值进行累加,并打印出结果。
窗口函数(WindowFunction)
所谓的“窗口函数”(window functions),就是定义窗口如何进行计算操作的函数
窗口函数根据处理的方式可以分为两类:「增量窗口聚合函数」和「全窗口聚合函数」。
增量窗口聚合函数
增量窗口聚合函数每来一条数据就立即进行计算,中间保持着聚合状态,但是不立即输出结果,等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。
常见的增量聚合函数有:reduce()
、aggregate()
、sum()
、min()
、max()
。
下面是一个使用增量聚合函数的代码示例:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> data = env.fromSequence(1,Long.MAX_VALUE);
DataStream<Long> result = data.keyBy(value -> value % 2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new SumAggregator());
result.print();
env.execute("Incremental Aggregation Job");
}
public static class SumAggregator implements AggregateFunction<Long, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Long value, Long accumulator) {
return value + accumulator;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
复制代码
这段代码从 1 到Long.MAX_VALUE
产生一个连续的数据流。接着,它将数据按照奇偶性进行分类,并在每个 5 秒的时间窗口内对相同类别的数值进行累加操作。最后打印出累加结果。
全窗口函数
全窗口函数是指在整个窗口中的所有数据都准备好后才进行计算。
Flink 中的全窗口函数有两种: WindowFunction
和ProcessWindowFunction
。
与增量窗口函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。
WindowFunction 接收一个 Iterable 类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction 则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。
这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(Processing Time)和事件时间水位线(Event Time Watermark)。
这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,WindowFunction 作用可以被 ProcessWindowFunction 全覆盖。
不过这种额外的功能可能会带来一些性能上的损失,因此只有当你确实需要这些额外功能时,才应该使用 ProcessWindowFunction,如果你不需要这些功能,“简单”的 WindowFunction 可能会更有效率。
下面是使用 WindowFunction 计算窗口内数据总和的代码示例:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements("a", "b", "c", "a", "b", "b");
DataStream<String> withTimestampsAndWatermarks = text.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(100))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);
DataStream<Tuple2<String, Integer>> mapped = withTimestampsAndWatermarks.map(
new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) {
return new Tuple2<>(value, 1);
}
});
mapped.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new SumWindowFunction())
.print();
env.execute("Window Sum");
}
复制代码
下面是一个使用 ProcessWindowFunction 统计网站 1 天 UV 的代码示例:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> data = env.fromElements(
new Tuple2<>("user1", 1),
new Tuple2<>("user2", 1),
new Tuple2<>("user1", 1));
data = data.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String,Integer>>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);
data.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.process(new UVProcessWindowFunction())
.print();
env.execute("Daily User View Count");
}
public static class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow> {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Long>> out){
long count = 0;
BloomFilter<CharSequence> bloomFilter = BloomFilter.create(Funnels.stringFunnel(), 100000, 0.01);
for (Tuple2<String, Integer> in: input) {
if (!bloomFilter.mightContain(in.f0)) {
count += 1;
bloomFilter.put(in.f0);
}
}
out.collect(new Tuple2<>(key.getField(0), count));
}
}
复制代码
这段代码从数据流中读取用户视图数据(数据为("user", view_count)),然后对每个用户的观看次数实现了基于时间窗口(一天)的统计。利用布隆过滤器并在窗口内去重,可以避免重复计数。最后,每个窗口结束时,它会输出每个用户的 id 和相应的不重复观看次数。
增量窗口函数和全窗口函数结合使用
全窗口函数为处理提供了更多的背景信息,因为它需要等到收集完所有窗口内的数据才进行计算,但是全窗口函数可能会增加系统的复杂性和运行时间。
另一方面,增量窗口函数可以在数据进入窗口时进行部分聚合计算,从而提高效率,但是它可能不适用于所有类型的计算,例如中位数或者标准差这种需要全部数据的计算就无法使用增量聚合。
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的 Window API 就给我们实现了这样的用法。
之前在调用 WindowedStream 的reduce()
和aggregate()
方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction
或者ProcessWindowFunction
。
// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
复制代码
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。
需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了
下面我们举一个具体的实例来说明:
在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用 url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次。
我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果,代码示例如下:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Long>> urlCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new CountAgg(), new WindowResultFunction());
urlCounts.print();
env.execute("UrlCount Job");
}
public static class CountAgg implements AggregateFunction<Tuple2<String, Integer>, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<String, Integer> value, Long accumulator) {
return accumulator + value.f1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class WindowResultFunction implements WindowFunction<Long, Tuple2<String, Long>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<Tuple2<String, Long>> out) {
Long count = input.iterator().next();
out.collect(new Tuple2<>(key, count));
}
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
在这个示例中,我们首先把数据根据 URL 进行了分组 (keyBy),然后定义了一个滑动窗口,窗口长度是 10 秒,每 5 秒滑动一次。接着我们使用增量聚合函数 CountAgg
对每个窗口内的元素进行聚合,最后用全窗口函数 WindowResultFunction
输出结果。
Window 重叠优化
窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。
例如,假设我们有一个数据流,它包含了 0 到 9 的整数。我们定义了一个大小为 5 的滑动窗口,滑动距离为 2。那么,我们将会得到以下三个窗口:
窗口 1:包含 0, 1, 2, 3, 4
窗口 2:包含 2, 3, 4, 5, 6
窗口 3:包含 4, 5, 6, 7, 8
在这个例子中,窗口 1 和窗口 2 之间存在重叠部分,即 2, 3, 4。同样,窗口 2 和窗口 3 之间也存在重叠部分,即 4, 5, 6。
enableOptimizeWindowOverlap()
方法是用来启用 Flink 的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。
在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap()
方法来启用窗口重叠优化功能。这意味着 Flink 不会尝试优化计算重叠窗口时的计算量。
如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:
env.getConfig().enableOptimizeWindowOverlap();
复制代码
这将启用窗口重叠优化功能,Flink 将尝试优化计算重叠窗口时的计算量。
触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于 WindowedStream 调用trigger()
方法,就可以传入一个自定义的窗口触发器(Trigger)。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) {
return new Tuple2<>(value, 1);
}
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.trigger(new MyTrigger())
.sum(1)
.print();
env.execute("Flink Trigger Example");
}
public static class MyTrigger extends Trigger<Tuple2<String, Integer>, TimeWindow> {
@Override
public TriggerResult onElement(Tuple2<String, Integer> stringIntegerTuple2, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) {
}
}
复制代码
这段代码主要从 localhost 的 9999 端口读取数据流,每条数据映射为一个包含该数据和整数 1 的元组。然后按照元组的第一个元素进行分组,并在每 5 秒的滚动窗口中对元组的第二个元素求和。最后使用用户自定义触发器,当新元素到达时立即触发计算并清空窗口,但在处理时间或事件时间上不做任何操作。
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器。
对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger,类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,这块了解一下即可。
移除器(Evictor)
移除器(Evictor)是用于在滚动窗口或会话窗口中控制数据保留和清理的组件。它可以根据特定的策略从窗口中删除一些数据,以确保窗口中保留的数据量不超过指定的限制。
移除器通常与窗口分配器一起使用,窗口分配器负责确定数据属于哪个窗口,而移除器则负责清理窗口中的数据。
以下是一个使用移除器的代码示例,演示如何在滚动窗口中使用基于计数的移除器:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个包含整数和时间戳的流
DataStream<Tuple2<Integer, Long>> dataStream = env.fromElements(
Tuple2.of(1, System.currentTimeMillis()),
Tuple2.of(2, System.currentTimeMillis() + 1000),
Tuple2.of(3, System.currentTimeMillis() + 2000),
Tuple2.of(4, System.currentTimeMillis() + 3000),
Tuple2.of(5, System.currentTimeMillis() + 4000),
Tuple2.of(6, System.currentTimeMillis() + 5000)
);
// 添加以下代码设置水印和事件时间戳
dataStream = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.f1));
// 在滚动窗口中使用基于计数的移除器,保留最近3个元素
dataStream
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(CountTrigger.of(3))
.evictor(CountEvictor.of(3))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
.print();
env.execute("Flink Evictor Example");
}
// 自定义聚合函数
private static class MyAggregateFunction implements AggregateFunction<Tuple2<Integer, Long>, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2<Integer, Long> value, Integer accumulator) {
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
// 自定义处理窗口函数
private static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, String, Integer, TimeWindow> {
private transient ListState<Integer> countState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("countState", Integer.class);
countState = getRuntimeContext().getListState(descriptor);
}
@Override
public void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
int count = elements.iterator().next();
countState.add(count);
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
String result = "Window: " + windowStart + " to " + windowEnd + ", Count: " + countState.get().iterator().next();
out.collect(result);
}
}
复制代码
这段代码主要用于对一串包含整数和时间戳的元素进行处理。首先,它创建了一个流并赋予了水印和时间戳。然后在滚动窗口中使用基于计数的触发器和驱逐器,只保留最近的三个元素。之后,通过自定义聚合和窗口函数,来处理窗口内的数据,聚合函数计算每个窗口内元素的数量,窗口函数将结果与窗口的开始和结束时间一起输出。
Flink Time 时间语义
Flink 定义了三类时间
事件时间(Event Time):数据在数据源产生的时间,一般由事件中的时间戳描述,比如用户日志中的 TimeStamp。
摄取时间(Ingestion Time):数据进入 Flink 的时间,记录被 Source 节点观察到的系统时间。
处理时间(Process Time):数据进入 Flink 被处理的系统时间(Operator 处理数据的系统时间)。
Flink 流式计算的时候需要显示定义时间语义,根据不同的时间语义来处理数据,比如指定的时间语义是事件时间,那么我们就要切换到事件时间的世界观中,窗口的起始与终止时间都是以事件时间为依据。
在 Flink 中默认使用的是 Process Time,如果要使用其他的时间语义,在执行环境中可以进行设置。
//设置时间语义为Ingestion Time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//设置时间语义为Event Time 我们还需要指定一下数据中哪个字段是事件时间(下文会讲)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
复制代码
Watermark(水印)
Watermark 的本质实质上是时间戳,简单而言,它是用来处理迟到数据的
在使用 Flink 处理数据的时候,数据通常都是按照事件产生的时间(事件时间)的顺序进入到 Flink,但是在遇到特殊情况下,比如遇到网络延迟或者使用 Kafka(多分区) 很难保证数据都是按照事件时间的顺序进入 Flink,很有可能是乱序进入。
如果数据一旦是乱序进入,那么在使用 Window 处理数据的时候,就会出现延迟数据不会被计算的问题。
如果有延迟数据,那么窗口需要等待全部的数据到来之后,再触发窗口执行。
需要等待多久?不可能无限期等待,我们用户可以自己来设置延迟时间,这样就可以尽可能保证延迟数据被处理。
使用 Watermark 就可以很好的解决延迟数据的问题。
根据用户指定的延迟时间生成水印(Watermak = 最大事件时间-指定延迟时间),当 Watermak 大于等于窗口的停止时间,这个窗口就会被触发执行。
举例:滚动窗口长度 10s,指定延迟时间 3s
2020-04-25 10:00:01 wm:2020-04-25 09:59:58
2020-04-25 10:00:02 wm:2020-04-25 09:59:59
2020-04-25 10:00:03 wm:2020-04-25 10:00:00
2020-04-25 10:00:09 wm:2020-04-25 10:00:06
2020-04-25 10:00:12 wm:2020-04-25 10:00:09
2020-04-25 10:00:08 wm:2020-04-25 10:00:05 延迟数据
2020-04-25 10:00:13 wm:2020-04-25 10:00:10
如果没有 Watermark ,那么在倒数第三条数据来的时候,就会触发执行,倒数第二条的延迟数据就不会被计算,有了水印之后就可以处理延迟 3s 内的数据
生成水印策略
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStream<String> stream = env.socketTextStream("node01", 8888)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(" ")[0]);
}));
复制代码
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
return element.f1;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {
return lastElement.f0.equals("watermark") ? new Watermark(extractedTimestamp) : null;
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<Tuple2<String, Long>>() {
private boolean running = true;
@Overrid
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
while (running) {
long currentTimestamp = System.currentTimeMillis();
ctx.collect(new Tuple2<>("key", currentTimestamp));
if (currentTimestamp % 10 == 0) {
// 每隔一段时间发出一个含有"watermark"的特殊事件
ctx.collect(new Tuple2<>("watermark", currentTimestamp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}).assignTimestampsAndWatermarks(new PunctuatedAssigner())
.print();
env.execute("Punctuated Watermark Example");
}
}
复制代码
这段代码定义了一个名为 PunctuatedAssigner 的时间戳和 watermark 分配器类,用于从接收到的元素中提取出时间戳,并根据特定条件(在本例中,元素的 key 是否为"watermark")生成并发送 watermark。
在 main 方法中,创建了一个源函数,此函数每秒生成一个新的事件,并且每隔 10 毫秒就发出一个包含"watermark"的特殊事件。这些事件被收集,分配时间戳和 watermark,然后打印出来。
允许延迟(Allowed Lateness)
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了、本该被丢弃的数据。
此方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义 OutputTag 来标识侧输出流
final OutputTag<String> lateDataTag = new OutputTag<String>("late-data"){};
DataStream<String> dataStream = env.socketTextStream("localhost", 9000);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = dataStream
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
})
.keyBy(value -> value.f0)
.process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void processElement(Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
if (value.f1 == 1) {
out.collect(value);
} else {
// 将迟到的数据发送到侧输出流
ctx.output(lateDataTag, "Late data detected: " + value);
}
}
});
// 获取侧输出流
DataStream<String> lateDataStream = resultStream.getSideOutput(lateDataTag);
resultStream.print();
lateDataStream.print();
env.execute("SideOutput Example");
}
复制代码
这段代码首先建立一个从本地 9000 端口读取数据的流,然后将每一行数据映射为一个二元组 (value, 1)。接着按照第一个字段进行分组,并进行处理:如果二元组的第二个元素等于 1,则直接输出;否则,该条数据会被视为“迟到数据”并输出至侧输出流。最后,主流和侧输出流的结果都会打印出来。
Flink 关联维度表
在 Flink 实际开发过程中,可能会遇到 source 进来的数据,需要连接数据库里面的字段,再做后面的处理,比如,想要通过 id 获取对应的地区名字,这时候需要通过 id 查询地区维度表,获取具体的地区名。
对于不同的应用场景,关联维度表的方式不同
场景 1:维度表信息基本不发生改变,或者发生改变的频率很低。
实现方案:采用 Flink 提供的 CachedFile。
Flink 提供了一个分布式缓存(CachedFile),可以使用户在并行函数中很方便的读取本地文件,并把它放在 TaskManager 节点中,防止 Task 重复拉取。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如 hdfs 或者 s3),通过 ExecutionEnvironment 注册缓存文件并为它起一个名称。
当程序执行,Flink 自动将文件或者目录复制到所有 TaskManager 节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从 TaskManager 节点的本地文件系统访问它。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("/root/id2city", "id2city");
DataStream<String> socketStream = env.socketTextStream("node01", 8888);
DataStream<Integer> stream = socketStream.map(Integer::valueOf);
DataStream<String> result = stream.map(new RichMapFunction<Integer, String>() {
private Map<Integer, String> id2CityMap;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
id2CityMap = new HashMap<>();
BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("id2city")));
String line;
while ((line = reader.readLine()) != null) {
String[] splits = line.split(" ");
Integer id = Integer.parseInt(splits[0]);
String city = splits[1];
id2CityMap.put(id, city);
}
reader.close();
}
@Override
public String map(Integer value) throws IOException {
return id2CityMap.getOrDefault(value, "not found city");
}
});
result.print();
env.execute();
}
复制代码
这段程序首先从"node01"主机的 8888 端口读取数据,然后将其转换为整数流。接着,它用一个富映射函数(RichMapFunction)将每个整数 ID 映射到城市名。这个映射是从在"/root/id2city"路径下注册的缓存文件中读取的。如果无法找到某个 ID 对应的城市,就会返回"not found city"。
在集群中查看对应 TaskManager 的 log 日志,发现注册的 file 会被拉取到各个 TaskManager 的工作目录区。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stream = env.socketTextStream("node01", 8888);
stream.map(new RichMapFunction<String, String>() {
private HashMap<String,String> map = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("init data ...");
query();
Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
query();
} catch (IOException e) {
e.printStackTrace();
}
}
},1000,2000);
}
void query() throws IOException {
Path path = Paths.get("D:\\code\\StudyFlink\\data\\id2city");
Stream<String> lines = Files.lines(path);
lines.forEach(line -> {
String[] parts = line.split(" ");
map.put(parts[0], parts[1]);
});
lines.close();
}
@Override
public String map(String key) throws Exception {
return map.getOrDefault(key, "not found city");
}
}).print();
env.execute();
}
复制代码
这段代码从名为"node01"的服务器的 8888 端口读取数据流,然后通过映射函数将每个接收到的数据键值(假设是城市 ID)转换为对应的城市名称。此映射来自一个定期更新的文件"D:\code\StudyFlink\data\id2city",如果没有找到匹配的城市 ID,则返回"not found city"。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.setProperty("group.id", "flink-kafka-001");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"configure",
new SimpleStringSchema(),
props
);
consumer.setStartFromLatest();
DataStream<String> configureStream = env.addSource(consumer);
DataStream<String> busStream = env.socketTextStream("node01", 8888);
MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>(
"dynamicConfig",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
);
BroadcastStream<String> broadcastStream = configureStream.broadcast(descriptor);
busStream.connect(broadcastStream).process(
new BroadcastProcessFunction<String, String, String>() {
@Override
public void processElement(String line, ReadOnlyContext ctx, Collector<String> out) throws Exception {
String city = ctx.getBroadcastState(descriptor).get(line);
if (city == null) {
out.collect("not found city");
} else {
out.collect(city);
}
}
@Override
public void processBroadcastElement(String line, Context ctx, Collector<String> out) throws Exception {
String[] elems = line.split(" ");
ctx.getBroadcastState(descriptor).put(elems[0], elems[1]);
}
}
).print();
env.execute();
}
复制代码
这段代码将从 Kafka 中获取的数据作为广播流,然后与从 socket 中获取的数据处理。在处理过程中,根据 socket 中的数据(作为 key)查找广播状态中的城市名称(作为 value),如果找到,则输出城市名,否则输出"not found city"。其中,Kafka 中的数据以空格分隔,第一个元素作为 key,第二个元素作为 value 存入 BroadcastState。
Table API & Flink SQL
在 Spark 中有 DataFrame 这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。
Flink 也提供了关系型编程接口 Table API 以及基于 Table API 的 SQL API,让用户能够通过使用结构化编程接口高效地构建 Flink 应用。同时 Table API 以及 SQL 能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套 API 编写流式应用和批量应用,从而达到真正意义的流批统一。
在 Flink 1.8 架构里,如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。 Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一。
阿里巴巴的 Blink 团队在这方面做了大量的工作,已经实现了 Table API & SQL 层的流批统一。阿里巴巴已经将 Blink 开源回馈给 Flink 社区。
开发环境构建
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴 Blink 团队贡献的诸多功能,取名叫: Blink Planner。
在使用 Table API 和 SQL 开发 Flink 应用之前,通过添加 Maven 的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了 Table API 和 SQL 接口。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.13.6</version>
</dependency>
复制代码
Table Environment
和 DataStream API 一样,Table API 和 SQL 具有相同的基本编程模型。首先需要构建对应的 TableEnviroment 创建关系型编程环境,才能够在程序中使用 Table API 和 SQL 来编写应用程序,另外 Table API 和 SQL 接口可以在应用中同时使用,Flink SQL 基于 Apache Calcite 框架实现了 SQL 标准协议,是构建在 Table API 之上的更高级接口。
首先需要在环境中创建 TableEnvironment 对象,TableEnvironment 中提供了注册内部表、执行 Flink SQL 语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment 创建方式也有所不同,但是都是通过调用create()
方法创建。
流计算环境下创建 TableEnviroment :
//创建流式计算的上下文环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建Table API的上下文环境
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
复制代码
Table API
Table API 顾名思义,就是基于“表”(Table)的一套 API,专门为处理表而设计的
它提供了关系型编程模型,可以用来处理结构化数据,支持表和视图的概念。在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现需求了,非常实用。
下面是一个简单的例子,它使用 Java 编写了一个 Flink 程序,该程序使用 Table API 从 CSV 文件中读取数据,然后执行简单的查询并将结果写入到自定义的 Sink 中。
首先我们需要导入 maven 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.6</version>
</dependency>
复制代码
代码示例如下:
public static void main(String[] args) throws Exception {
// 创建流处理环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建表环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 从CSV文件中读取数据
DataStream<Tuple2<String, Integer>> data = env.readTextFile("input.csv")
.map(line -> {
String[] parts = line.split(",");
return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
})
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 使用Table API将数据转换为表并注册为视图
String name = "people";
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
tableEnv.createTemporaryView(name, data, schema);
// 使用SQL查询年龄大于30的人
Table result = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30");
// 将结果转换为DataStream
DataStream<Row> output = tableEnv.toDataStream(result);
output.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value, Context context) throws Exception {
// implement the sink here, e.g., write into a file, send to Kafka, etc.
}
});
env.execute();
}
复制代码
这段代码是在流处理环境中实现的一个简单的 ETL(提取-转换-加载)过程:它从 CSV 文件中读取数据,对数据进行映射和转化,然后使用 SQL 查询在一个临时视图上查找年龄大于 30 的人,最后将结果输出到某个自定义的 Sink 上。
Virtual Tables(虚拟表)
在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30");
复制代码
得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL,又该怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册,所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用:
tableEnv.createTemporaryView("NewTable", newTable);
复制代码
这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图” (createTemporaryView)。
表流互转
// 将表转换成数据流,并打印
tableEnv.toDataStream(result).print();
// 将数据流转换成表
// 我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置
Table table = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
复制代码
动态表和持续查询
在 Flink 中,动态表(Dynamic Tables)是一种特殊的表,它可以随时间变化。它们通常用于表示无限流数据,例如事件流或服务器日志。与静态表不同,动态表可以在运行时插入、更新和删除行。
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询(Continuous Query)。
下面是一个简单的例子,它使用 Java 编写了一个 Flink 程序,该程序从名为"input-topic"的 Kafka 主题中读取 JSON 格式的数据(属性包括"name"和"age"),过滤出所有年龄大于 30 岁的记录,并将结果输出到另一个名为"output-topic"的 Kafka 主题中。同时,处理的结果也会在控制台上打印出来。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE input (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'input-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
tableEnv.executeSql("CREATE TABLE output (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'output-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
Table result = tableEnv.sqlQuery("SELECT name, age FROM input WHERE age > 30");
tableEnv.toAppendStream(result, Row.class).print();
result.executeInsert("output");
env.execute();
}
复制代码
连接到外部系统
在 Table API 编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互。
其中最简单的当然就是连接到控制台打印输出:
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);
复制代码
Kafka
需要导入 maven 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
复制代码
创建一个连接到 Kafka 的表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka,并定义必要的配置参数:
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
复制代码
MySQL
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
</dependency>
复制代码
创建 JDBC 表的方法与前面 Kafka 大同小异:
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;
复制代码
Table API 实战
1.创建 Table
Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件系统和 Kafka 消息队列等外部系统。
从文件中创建 Table(静态表)
Flink 允许用户从本地或者分布式文件系统中读取和写入数据,只需指定相应的参数即可。但是文件格式必须是 CSV 格式的。其他文件格式也支持(在 Flink 中还有 Connector 等来支持其他格式或者自定义 TableSource)
public static void main(String[] args) throws Exception {
// 创建流式计算的上下文环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建Table API的上下文环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建CSV表源
String sourceDDL = "CREATE TABLE exampleTab (" +
"`id` INT, " +
"`name` STRING, " +
"`score` DOUBLE" +
") WITH (" +
"'connector' = 'filesystem'," +
"'path' = 'D:\\code\\StudyFlink\\data\\tableexamples'," +
"'format' = 'csv'" +
")";
tableEnv.executeSql(sourceDDL);
// 打印表结构
ResolvedSchema schema = tableEnv.from("exampleTab").getResolvedSchema();
System.out.println(schema.toString());
}
复制代码
从 DataStream 中创建 Table(动态表)
前面已经知道 Table API 是构建在 DataStream API 和 DataSet API 之上的一层更高级的抽象,因此用户可以灵活地使用 Table API 将 Table 转换成 DataStream 或 DataSet 数据集,也可以将 DataSteam 或 DataSet 数据集转换成 Table,这和 Spark 中的 DataFrame 和 RDD 的关系类似。
public static void main(String[] args) throws Exception {
// 先创建StreamExecutionEnvironment
final StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 创建一个DataStream
DataStream<Tuple2<String, Integer>> stream = bsEnv.fromElements(Tuple2.of("Alice", 3), Tuple2.of("Bob", 4));
// 将DataStream转化为Table
Table table1 = bsTableEnv.fromDataStream(stream);
// 再把Table转回DataStream
DataStream<Row> streamAgain = bsTableEnv.toDataStream(table1);
}
复制代码
2.查询和过滤
在 Table 对象上使用select
操作符查询需要获取的指定字段,也可以使用filter
或where
方法过滤字段和检索条件,将需要的数据检索出来。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
// Create the Table API execution environment.
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
SingleOutputStreamOperator<Tuple5<String, String, String, Long, Long>> data = streamEnv.socketTextStream("hadoop101", 8888)
.map(new MapFunction<String, Tuple5<String, String, String, Long, Long>>() {
@Override
public Tuple5<String, String, String, Long, Long> map(String line) throws Exception {
String[] arr = line.split(",");
return new Tuple5<>(arr[0].trim(), arr[1].trim(), arr[2].trim(), Long.parseLong(arr[4].trim()), Long.parseLong(arr[5].trim()));
}
});
Table table = tableEnv.fromDataStream(data);
// Query
tableEnv.toAppendStream(table.select("f0 AS sid, f1 AS type, f3 AS callTime, f4 AS callOut"), Row.class)
.print();
// Filter Query
tableEnv.toAppendStream(table.filter("f1 === 'success'").where("f1 === 'success'"), Row.class)
.print();
tableEnv.execute("sql");
}
复制代码
这段代码从一个指定的 socket 中读取文本数据,将每一行数据映射为一个 5 元组(Tuple5),然后把这个数据流转换为表,并进行查询操作。首先,它进行简单的列选择查询并打印结果;然后,它进行筛选查询,选取第二字段"成功"的记录并打印出来。整个过程在一个名为"sql"的任务中执行。
3.UDF 自定义函数
用户可以在 Table API 中自定义函数类,常见的抽象类和接口是:
ScalarFunction
TableFunction
AggregateFunction
TableAggregateFunction
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 注册UDF
tableEnv.createTemporarySystemFunction("UpperCase", UpperCaseFunction.class);
// 使用UDF
tableEnv.executeSql(
"SELECT UpperCase(myField) FROM myTable"
);
}
public static class UpperCaseFunction extends ScalarFunction {
public String eval(String str) {
return str.toUpperCase();
}
}
复制代码
这段代码创建了自定义函数(UDF)并使用它。首先,它设置了 Flink 的环境,并通过 Blink Planner 以批处理模式运行。然后,它注册了一个名为 "UpperCase" 的 UDF,该函数将输入字符串转换为大写。最后,它在 SQL 查询中使用了这个 UDF,将 "myTable" 中的 "myField" 字段的值转换成大写形式。
4.Window
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建一个具有 Process Time 时间属性的表
tableEnv.executeSql(
"CREATE TABLE Orders (" +
"orderId INT, " +
"price DOUBLE, " +
"buyer STRING, " +
"orderTime TIMESTAMP(3)," +
"pt AS PROCTIME()" + // 使用处理时间
") WITH ('connector' = '...', ...)"
);
Table orders = tableEnv.from("Orders");
Table result1 = orders.window(Tumble.over(lit(10).minutes()).on($("pt")).as("w"))
.groupBy($("w"), $("buyer"))
.select($("buyer"), $("w").start().as("start"), $("w").end().as("end"), $("price").sum().as("totalPrice"));
// 创建一个具有 Event Time 时间属性的表,使用Watermarks
tableEnv.executeSql(
"CREATE TABLE OrdersEventTime (" +
"orderId INT, " +
"price DOUBLE, " +
"buyer STRING, " +
"orderTime TIMESTAMP(3), " +
"WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND" + // 使用事件时间和水印
") WITH ('connector' = '...', ...)"
);
Table ordersEventTime = tableEnv.from("OrdersEventTime");
Table result2 = ordersEventTime.window(Tumble.over(lit(10).minutes()).on($("orderTime")).as("w"))
.groupBy($("w"), $("buyer"))
.select($("buyer"), $("w").start().as("start"), $("w").end().as("end"), $("price").sum().as("totalPrice"));
// 对于 IngestionTime,Flink 1.12 中已经不推荐使用,因此在 Flink 1.13.6 版本中,你应该使用 ProcessTime 或 EventTime。
}
复制代码
这段代码创建了两个表:一个使用处理时间(Process Time),另一个使用事件时间(Event Time)并设置了水印。针对这两个表,分别在买家(buyer)和 10 分钟的时间窗口上进行分组,并计算了每个时间窗口中的总价(totalPrice)。
多类型数据流
在 Flink 中,DataStream
,ChangelogStream
,AppendStream
和 RetractStream
用于表示不同类型的数据流。简单来说,它们之间的主要区别和联系如下:
DataStream:这是 Flink 的基础抽象,它表示一个无界的数据流,可以包含任何类型的元素。
toChangelogStream:这个方法将表转换为一个 ChangeLog 模式的 DataStream。每条记录都代表一个添加、修改或删除的事件。事件通常由可选的元数据标记(例如,'+'(添加)或'-'(撤销))、更新时间以及唯一的键和值组成。ChangelogStream 主要用于处理动态表,并且支持插入,更新和删除操作。
toAppendStream:这个方法将表转换为一个只包含添加操作的 DataStream。换句话说,结果表只包含插入(append)操作,不能执行更新或删除操作。如果查询的结果表支持删除或更新,则此方法会抛出异常。
toRetractStream:这个方法将表转换为一个包含添加和撤销消息的 DataStream。每一条添加消息表示在结果表中插入了一行,而每一条撤销消息表示在结果表中删除了一行。如果撤销消息后没有相应的添加消息,那么可能是因为输入数据发生了变化,导致之前发送的结果不再正确,需要被撤销。
Flink SQL
企业中 Flink SQL 比 Table API 用的多
Flink SQL 是 Apache Flink 提供的一种使用 SQL 查询和处理数据的方式。它允许用户通过 SQL 语句对数据流或批处理数据进行查询、转换和分析,无需编写复杂的代码。Flink SQL 提供了一种更直观、易于理解和使用的方式来处理数据,同时也可以与 Flink 的其他功能无缝集成。
Flink SQL 支持 ANSI SQL 标准,并提供了许多扩展和优化来适应流式处理和批处理场景。它能够处理无界数据流,具备事件时间和处理时间的语义,支持窗口、聚合、连接等常见的数据操作,还提供了丰富的内置函数和扩展插件机制。
下面是一个简单的 Flink SQL 代码示例,展示了如何使用 Flink SQL 对流式数据进行查询和转换。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 设置并行度为1,方便观察输出结果
// 创建 Kafka 数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
DataStream<String> sourceStream = env.addSource(kafkaConsumer);
// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册数据源表
tableEnv.createTemporaryView("source_table", sourceStream, "message");
// 执行 SQL 查询和转换
String query = "SELECT message, COUNT(*) AS count FROM source_table GROUP BY message";
// 执行 SQL 查询和转换
Table resultTable = tableEnv.sqlQuery(query);
DataStream<Result> resultStream = tableEnv.toDataStream(resultTable)
.map(row -> new Result(row.getField(0).toString(), (Long) row.getField(1)));
// 打印结果
resultStream.print();
env.execute("Flink SQL Example");
}
// 自定义结果类
public static class Result {
public String message;
public Long count;
public Result() {
}
public Result(String message, Long count) {
this.message = message;
this.count = count;
}
@Override
public String toString() {
return "Result{" +
"message='" + message + '\'' +
", count=" + count +
'}';
}
}
复制代码
在上述示例中,我们使用 Kafka 作为数据源,并创建了一个消费者从名为 "input-topic" 的 Kafka 主题中读取数据。然后,我们将数据流注册为名为 "source_table" 的临时表。
接下来,我们使用 Flink SQL 执行 SQL 查询和转换。在这个例子中,我们查询 "source_table" 表,对 "message" 字段进行分组并计算每个消息出现的次数。查询结果会映射到自定义的 Result
类,并最终通过 print()
方法打印到标准输出。
最后,我们通过调用 env.execute()
方法来启动 Flink 作业的执行。
Flink SQL 中使用窗口函数
Flink SQL 中使用滚动窗口,滑动窗口和会话窗口代码示例如下:
public static void main(String[] args) throws Exception {
// 初始化流处理执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 对于实际应用程序,请替换为你的数据源
String sourceDDL =
"CREATE TABLE MySourceTable (\n" +
" user_id STRING,\n" +
" event_time TIMESTAMP(3),\n" +
" price DOUBLE\n" +
") WITH (\n" +
"'connector' = '...',\n" +
"...);\n";
tableEnv.executeSql(sourceDDL);
// 滚动窗口
String tumblingWindowQuery =
"SELECT user_id, SUM(price) as total_price\n" +
"FROM MySourceTable\n" +
"GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)";
Table tumblingWindowResult = tableEnv.sqlQuery(tumblingWindowQuery);
// 滑动窗口
String slidingWindowQuery =
"SELECT user_id, SUM(price) as total_price\n" +
"FROM MySourceTable\n" +
"GROUP BY user_id, HOP(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR)";
Table slidingWindowResult = tableEnv.sqlQuery(slidingWindowQuery);
// 会话窗口
String sessionWindowQuery =
"SELECT user_id, SUM(price) as total_price\n" +
"FROM MySourceTable\n" +
"GROUP BY user_id, SESSION(event_time, INTERVAL '1' HOUR)";
Table sessionWindowResult = tableEnv.sqlQuery(sessionWindowQuery);
}
复制代码
程序定义了三种不同类型的窗口查询:滚动窗口(tumbling window),滑动窗口(sliding window),会话窗口(session window)。
滚动窗口:该查询对"MySourceTable"中的数据应用滚动窗口,窗口大小为 1 小时,并按 user_id 进行分组。每个窗口内,会计算每个用户的总价格(sum(price))。
滑动窗口:与滚动窗口相似, 但是窗口可以重叠. 这个查询每半小时滑动一次, 并且每次滑动都会创建一个 1 小时大小的窗口, 再进行与滚动窗口查询相同的计算.
会话窗口:会话窗口是根据数据活跃度来划分的,当一个会话内一段时间(这里设定为 1 小时)没有新的数据到达时,就认为会话结束。该查询按 user_id 和 event_time 的会话窗口进行分组,然后在每个窗口中计算总价格。
每个查询调用tableEnv.sqlQuery(query)
方法,并将结果存储在 Table 对象中。注意这些查询在调用 sqlQuery 时并没有立即执行,只有当你对结果做出动作(如 print、collect 或者写入外部系统)时,才会触发执行。
Flink 内存优化
在大数据领域,大多数开源框架(Hadoop、Spark、Flink)都是基于 JVM 运行,但是 JVM 的内存管理机制往往存在着诸多类似OutOfMemoryError
的问题,主要是因为创建过多的对象实例而超过 JVM 的最大堆内存限制,却没有被有效回收掉。
这在很大程度上影响了系统的稳定性,尤其对于大数据应用,面对大量的数据对象产生,仅仅靠 JVM 所提供的各种垃圾回收机制很难解决内存溢出的问题。
在开源框架中有很多框架都实现了自己的内存管理,例如 Apache Spark 的 Tungsten 项目,在一定程度上减轻了框架对 JVM 垃圾回收机制的依赖,从而更好地使用 JVM 来处理大规模数据集。
Flink 也基于 JVM 实现了自己的内存管理,将 JVM 根据内存区分为 Unmanned Heap、Flink Managed Heap、Network Buffers 三个区域
在 Flink 内部对 Flink Managed Heap 进行管理,在启动集群的过程中直接将堆内存初始化成 Memory Pages Pool,也就是将内存全部以二进制数组的方式占用,形成虚拟内存使用空间。
新创建的对象都是以序列化成二进制数据的方式存储在内存页面池中,当完成计算后数据对象 Flink 就会将 Page 置空,而不是通过 JVM 进行垃圾回收,保证数据对象的创建永远不会超过 JVM 堆内存大小,也有效地避免了因为频繁 GC 导致的系统稳定性问题。
JobManager 配置
JobManager 在 Flink 系统中主要承担管理集群资源、接收任务、调度 Task、收集任务状态以及管理 TaskManager 的功能,JobManager 本身并不直接参与数据的计算过程,因此 JobManager 的内存配置项不是特别多,只要指定 JobManager 堆内存大小即可。
TaskManager 配置
TaskManager 作为 Flink 集群中的工作节点,所有任务的计算逻辑均执行在 TaskManager 之上,因此对 TaskManager 内存配置显得尤为重要,可以通过以下参数配置对 TaskManager 进行优化和调整。
taskmanager.heap.size:设定 TaskManager 堆内存大小,默认值为 1024M,如果在 Yarn 的集群中,TaskManager 取决于 Yarn 分配给 TaskManager Container 的内存大小,且 Yarn 环境下一般会减掉一部分内存用于 Container 的容错。
taskmanager.jvm-exit-on-oom:设定 TaskManager 是否会因为 JVM 发生内存溢出而停止,默认为 false,当 TaskManager 发生内存溢出时,也不会导致 TaskManager 停止。
taskmanager.memory.size:设定 TaskManager 内存大小,默认为 0,如果不设定该值将会使用taskmanager.memory.fraction
作为内存分配依据。
taskmanager.memory.fraction:设定 TaskManager 堆中去除 Network Buffers 内存后的内存分配比例。该内存主要用于 TaskManager 任务排序、缓存中间结果等操作。例如,如果设定为 0.8,则代表 TaskManager 保留 80%内存用于中间结果数据的缓存,剩下 20%的内存用于创建用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size
不设定的情况下才生效。
taskmanager.memory.off-heap:设置是否开启堆外内存供 Managed Memory 或者 Network Buffers 使用。
taskmanager.memory.preallocate:设置是否在启动 TaskManager 过程中直接分配 TaskManager 管理内存。
taskmanager.numberOfTaskSlots:每个 TaskManager 分配的 slot 数量。
Flink 的网络缓存优化
Flink 将 JVM 堆内存切分为三个部分,其中一部分为 Network Buffers 内存。Network Buffers 内存是 Flink 数据交互层的关键内存资源,主要目的是缓存分布式数据处理过程中的输入数据。
通常情况下,比较大的 Network Buffers 意味着更高的吞吐量。如果系统出现“Insufficient number of network buffers”的错误,一般是因为 Network Buffers 配置过低导致,因此,在这种情况下需要适当调整 TaskManager 上 Network Buffers 的内存大小,以使得系统能够达到相对较高的吞吐量。
目前 Flink 能够调整 Network Buffer 内存大小的方式有两种:一种是通过直接指定 Network Buffers 内存数量的方式,另外一种是通过配置内存比例的方式。
设定 Network Buffer 内存数量(过时)
直接设定 Nework Buffer 数量需要通过如下公式计算得出:
NetworkBuffersNum = total-degree-of-parallelism \* intra-node-parallelism * n
其中total-degree-of-parallelism
表示每个 TaskManager 的总并发数量,intra-node-parallelism
表示每个 TaskManager 输入数据源的并发数量,n 表示在预估计算过程中 Repar-titioning 或 Broadcasting 操作并行的数量。intra-node-parallelism
通常情况下与 Task-Manager 的所占有的 CPU 数一致,且 Repartitioning 和 Broadcating 一般下不会超过 4 个并发。可以将计算公式转化如下:
NetworkBuffersNum = <slots-per-TM>^2 \* < TMs>* 4
其中 slots-per-TM 是每个 TaskManager 上分配的 slots 数量,TMs 是 TaskManager 的总数量。对于一个含有 20 个 TaskManager,每个 TaskManager 含有 8 个 Slot 的集群来说,总共需要的 Network Buffer 数量为 8^2*204=5120 个,因此集群中配置 Network Buffer 内存的大小约为 160M 较为合适。
计算完 Network Buffer 数量后,可以通过添加如下两个参数对 Network Buffer 内存进行配置。其中 segment-size 为每个 Network Buffer 的内存大小,默认为 32KB,一般不需要修改,通过设定 numberOfBuffers 参数以达到计算出的内存大小要求。
设定 Network Buffer 内存比例(推荐)
从 1.3 版本开始,Flink 就提供了通过指定内存比例的方式设置 Network Buffer 内存大小。
taskmanager.network.memory.fraction:JVM 中用于 Network Buffers 的内存比例。
taskmanager.network.memory.min:最小的 Network Buffers 内存大小,默认为 64MB。
taskmanager.network.memory.max:最大的 Network Buffers 内存大小,默认 1GB。
taskmanager.memory.segment-size:内存管理器和 Network 栈使用的 Buffer 大小,默认为 32KB。
结语
感谢你耐心地读到这里,在我们结束这篇博客的同时,我鼓励你继续探索和实践 Flink 的无尽可能性。无论你是初学者还是专业人士,Flink 都有许多值得挖掘的深度和广度。这就像一场数据处理的冒险,充满了挑战与机遇。无论你走到哪一步,都记得享受过程,因为每一个问题的解决都代表着新的认知和成长。
再次感谢你的阅读,希望这篇文章能够带给你收获以及深入的思考,期待你在 Flink 的学习旅程中取得更大的进步。
评论