流处理引擎 Flink: 编程 - 程序结构
Flink 是目前最流行的流处理引擎,里面有很多优秀的设计思想,计划从编程实现、设计思想和源码解读三个角度来学习 Flink,主要结合第四范式张利兵的《Flink 原理、实战与性能优化》的内容来学习 Flink 的编程。
本文主要主要从了解 Flink 编程整体思想的角度了解一个 Flink 程序的结构,主要从
Flink 编程的接口分层与抽象
Flink 程序实现的步骤,以 WordCount 为例进行实现
Flink 接口抽象
Flink 根据数据集类型的不同将核心数据处理接口分为两大类:支持批计算的接口 DataSet API 和支持流计算的接口 DataStream API。同时,Flink 将数据处理接口抽象成四层,用户可以根据需要选择任意一层抽象接口来开发 Flink 应用[1]。
SQL API
提供了统一的 SQL API 完成对批计算和流计算的处理,目前 SQL API 也是社区重点发展的接口层
Table API
Table API 将内存中的 DataStream 和 DataSet 数据集在原有的基础之上增加 Schema 信息,将数据类型统一抽象成表结构,然后通过 Table API 提供的接口处理对应的数据集。SQL API 则可以直接查询 Table API 中注册表中的数据表。Table API 构建在 DataStream 和 DataSet 之上的同时,提供了大量面向领域语言的编程接口,例如 GroupByKey、Join 等操作符,提供给用户一种更加友好的处理数据集的方式。除此之外,Table API 在转换为 DataStream 和 DataSet 的数据处理过程中,也应用了大量的优化规则对处理逻辑进行了优化。同时 Table API 中的 Table 可以和 DataStream 及 DataSet 之间进行相互转换。
DataStream /DataSet API
DataStream API 和 DataSet API 主要面向具有开发经验的用户,用户可以使用 DataStream API 处理无界流数据,使用 DataSet API 处理批量数据。DataStream API 和 DataSet API 接口同时提供了各种数据处理接口,例如 map,filter、oins、aggregations、window 等,接口 do 提供了 Java、Scala 和 Python 等多种语言 SDK。
Stateful Stream Processing API
该层是 Flink 中处理 Stateful Stream 最底层的接口,用户可以使用 Stateful Stream Process 接口操作状态、时间等底层数据。使用 Stream Process API 接口开发应用的灵活性非常强,可以实现非常复杂的流式计算逻辑,但是相对用户使用成本也比较高,一般企业在使用 Flink 进行二次开发或深度封装的时候会用到这层接口。
Flink 程序结构
一个完整的 Flink 程序包括五步:
创建 Flink 执行环境
创建并加载数据集
对数据进行转换操作
指定结算结果的输出位置
调用 execute 触发程序执行
以 WordCount 流处理为例进行了代码实现,相关实现上传到了GitHub。下面来了解一下这几个步骤:
创建执行环境
不同的运行环境决定了应用的类型,StreamExecutionEnvironment
是用来做流式数据处理环境,ExecutionEnvironment
是批量数据处理环境。
创建流式数据处理环境
创建批处理环境
开发批量应用需要获取 Execution-Environment 来构建批量应用开发环境,如以下代码实例通过调用ExecutionEnvironment
的静态方法来获取批计算环境。
初始化数据
创建完可执行环境需要将数据导入 Flink 系统进行数据的初始化,并将外部数据转换为DataStream<T>
或DataSet<T>
数据集。
readTextFile()
方法读取file:///pathfile
路径中的数据并转换成DataStream<String>
数据集。
执行转换操作
对数据集进行各种转换操作。Flink 中的 Transformation 操作都是通过不同的 Operator 来实现,每个 Operator 内部通过实现 Function 接口完成数据处理逻辑定义。DataStream API 和 DataSet API 提供大量转换算子。
map
(也可以通过MapFunction
函数实现)flatMap
(可以通过FlatMapFunction
函数实现)filter
算子keyBy
算子
Function 的实现可以通过多种方式来实现,以 flatMap 为例子来实现。flatMap
转换算子可以直接采用 Flink 的转换算子,也可以通过FlatMapFunction
来实现,该函数有这个函数有两个泛型T
和O
,T
是输入,O
是输出,需要对FlatMap
函数进行重写。
也可以通过更高级的 RichFunction 来实现,
以 WordCount 为例,对 FlatMap 算子进行重写。
1. 通过创建 Class 实现 Funciton 接口
以 WordCount 为例,实现将一行数据转换为小写,按照空格切分。
之后将原实现修改下面就可以。
2. 通过创建匿名类实现 Funciton 接口
3. 通过实现 RichFunciton 接口
Flink 中提供了 RichFunction 接口,主要用于比较高级的数据处理场景,RichFunction 接口中有 open、close、getRuntimeContext 和 setRuntimeContext 等方法来获取状态,缓存等系统内部数据。和FlatMapFunction
相似,RichFunction 子类中也有RichFlatMapFunction
。
和FlatMapFunction
类似,也可以采用匿名函数实现。
分区 Key 指定
在 DataStream 数据经过不同的算子转换过程中,某些算子需要根据指定的 key 进行转换,常见的有join
、coGroup
、groupBy
类算子,需要先将 DataStream 或 DataSet 数据集转换成对应的KeyedStream
和GroupedDataSet
,主要目的是将相同 key 值的数据路由到相同的 Pipeline 中,然后进行下一步的计算操作。需要注意的是,在 Flink 中这种操作并不是真正意义上将数据集转换成 Key-Value 结构,而是一种虚拟的 key,目的仅仅是帮助后面的基于 Key 的算子使用,分区 Key 可以通过两种方式指定:
1. 根据字段位置指定
在 DataStream API 中通过 keyBy()方法将 DataStream 数据集根据指定的 key 转换成重新分区的 KeyedStream,如以下代码所示,对数据集按照相同 key 进行 sum()聚合操作。
在 DataSet API 中,如果对数据根据某一条件聚合数据,对数据进行聚合时候,也需要对数据进行重新分区。如以下代码所示,使用 DataSet API 对数据集根据第一个字段作为 GroupBy 的 key,然后对第二个字段进行求最大值运算。
2. 根据字段名称指定
KeyBy 和 GroupBy 的 Key 也可以根据字段的名称来指定。使用字段名称需要 DataStream 中的数据结构类型必须是 Tuple 类或者 POJOs 类的。
如果程序中使用 Tuple 数据类型,通常情况下字段名称从 1 开始计算,字段位置索引从 0 开始计算,下面实现等价:
如果程序中使用 Tuple 数据类型,通常情况下字段名称从 1 开始计算,字段位置索引从 0 开始
3. 通过 Key 选择器指定
通过定义 Key Selector 来选择数据集中的 Key,如下代码所示,定义 KeySelector,然后复写 getKey 方法,从 Person 对象中获取 name 为指定的 Key。
以 WordCount 的 keyBy 为例:
输出结果
Flink DataStream
和DataSet
接口中定义了基本的数据输出方法,例如
基于文件输出
writeAsText()
,基于控制台输出
print()
Flink 在系统中定义了大量的 Connector,方便用户和外部系统交互,用户可以直接通过调用 addSink()添加输出系统定义的 DataSink 类算子,这样就能将数据输出到外部系统。
程序触发
所有的计算逻辑全部操作定义好之后,需要调用 ExecutionEnvironment 的 execute()方法来触发应用程序的执行,其中 execute()方法返回的结果类型为 JobExecutionResult,里面包含了程序执行的时间和累加器等指标。需要注意的是,execute 方法调用会因为应用的类型有所不同,DataStream 流式应用需要显性地指定 execute()方法运行程序,如果不调用则 Flink 流式程序不会执行,但对于 DataSet API 输出算子中已经包含对 execute()方法的调用,则不需要显性调用 execute()方法,否则会出现程序异常。
参考资料
张利兵的《Flink 原理、实战与性能优化》
评论