写点什么

流处理引擎 Flink: 编程 - 程序结构

作者:正向成长
  • 2021 年 12 月 04 日
  • 本文字数:4505 字

    阅读完需:约 15 分钟

流处理引擎Flink:编程 - 程序结构

Flink 是目前最流行的流处理引擎,里面有很多优秀的设计思想,计划从编程实现、设计思想和源码解读三个角度来学习 Flink,主要结合第四范式张利兵的《Flink 原理、实战与性能优化》的内容来学习 Flink 的编程。


本文主要主要从了解 Flink 编程整体思想的角度了解一个 Flink 程序的结构,主要从

  1. Flink 编程的接口分层与抽象

  2. Flink 程序实现的步骤,以 WordCount 为例进行实现


Flink 接口抽象

Flink 根据数据集类型的不同将核心数据处理接口分为两大类:支持批计算的接口 DataSet API 和支持流计算的接口 DataStream API。同时,Flink 将数据处理接口抽象成四层,用户可以根据需要选择任意一层抽象接口来开发 Flink 应用[1]。

  • SQL API

提供了统一的 SQL API 完成对批计算和流计算的处理,目前 SQL API 也是社区重点发展的接口层


  • Table API

Table API 将内存中的 DataStream 和 DataSet 数据集在原有的基础之上增加 Schema 信息,将数据类型统一抽象成表结构,然后通过 Table API 提供的接口处理对应的数据集。SQL API 则可以直接查询 Table API 中注册表中的数据表。Table API 构建在 DataStream 和 DataSet 之上的同时,提供了大量面向领域语言的编程接口,例如 GroupByKey、Join 等操作符,提供给用户一种更加友好的处理数据集的方式。除此之外,Table API 在转换为 DataStream 和 DataSet 的数据处理过程中,也应用了大量的优化规则对处理逻辑进行了优化。同时 Table API 中的 Table 可以和 DataStream 及 DataSet 之间进行相互转换。


  • DataStream /DataSet API

DataStream API 和 DataSet API 主要面向具有开发经验的用户,用户可以使用 DataStream API 处理无界流数据,使用 DataSet API 处理批量数据。DataStream API 和 DataSet API 接口同时提供了各种数据处理接口,例如 map,filter、oins、aggregations、window 等,接口 do 提供了 Java、Scala 和 Python 等多种语言 SDK。


  • Stateful Stream Processing API

该层是 Flink 中处理 Stateful Stream 最底层的接口,用户可以使用 Stateful Stream Process 接口操作状态、时间等底层数据。使用 Stream Process API 接口开发应用的灵活性非常强,可以实现非常复杂的流式计算逻辑,但是相对用户使用成本也比较高,一般企业在使用 Flink 进行二次开发或深度封装的时候会用到这层接口。


Flink 程序结构

一个完整的 Flink 程序包括五步:

  1. 创建 Flink 执行环境

  2. 创建并加载数据集

  3. 对数据进行转换操作

  4. 指定结算结果的输出位置

  5. 调用 execute 触发程序执行

以 WordCount 流处理为例进行了代码实现,相关实现上传到了GitHub。下面来了解一下这几个步骤:

创建执行环境

不同的运行环境决定了应用的类型,StreamExecutionEnvironment是用来做流式数据处理环境,ExecutionEnvironment是批量数据处理环境。

创建流式数据处理环境

// 设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境StreamExecutionEnvironment.getExecutionEnvironment
//指定并行度创建本地执行环境StreamExecutionEnvironment.createLocalEnvironment(5)
/* 指定远程JobManager IP和RPC端口以及运行程序所在jar包及其依赖包 本地相当于一个客户端 创建与集群的JobManager建立RPC连接,将指定的Jar包拷贝到JobManager节点,然后运行在远程集群*/StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
复制代码

创建批处理环境

开发批量应用需要获取 Execution-Environment 来构建批量应用开发环境,如以下代码实例通过调用ExecutionEnvironment的静态方法来获取批计算环境。

//设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境ExecutionEnvironment.getExecutionEnvironment
//指定并行度创建本地执行环境ExecutionEnvironment.createLocalEnvironment(5)
//指定远程JobManagerIP和RPC端口以及运行程序所在jar包及其依赖包ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
复制代码

初始化数据

创建完可执行环境需要将数据导入 Flink 系统进行数据的初始化,并将外部数据转换为DataStream<T>DataSet<T>数据集。

  • readTextFile()方法读取file:///pathfile路径中的数据并转换成DataStream<String>数据集。

执行转换操作

对数据集进行各种转换操作。Flink 中的 Transformation 操作都是通过不同的 Operator 来实现,每个 Operator 内部通过实现 Function 接口完成数据处理逻辑定义。DataStream API 和 DataSet API 提供大量转换算子。

  • map(也可以通过MapFunction函数实现)

  • flatMap(可以通过FlatMapFunction函数实现)

  • filter算子

  • keyBy算子

Function 的实现可以通过多种方式来实现,以 flatMap 为例子来实现。flatMap转换算子可以直接采用 Flink 的转换算子,也可以通过FlatMapFunction来实现,该函数有这个函数有两个泛型TOT是输入,O是输出,需要对FlatMap函数进行重写。

public interface FlatMapFunction<T, O> extends Function, Serializable {  void flatMap(T value, Collector<O> out) throws Exception;}
复制代码

也可以通过更高级的 RichFunction 来实现,

public abstract class RichFlatMapFunction<IN,OUT> extends AbstractRichFunction			implements FlatMapFunction<IN,OUT> {    abstract voidflatMap(IN value, Collector<OUT> out)}
复制代码

以 WordCount 为例,对 FlatMap 算子进行重写。

1. 通过创建 Class 实现 Funciton 接口

以 WordCount 为例,实现将一行数据转换为小写,按照空格切分。

class MyFlatMapFunction extends FlatMapFunction[String, String] {  override def flatMap(t : String, out : Collector[String]) : Unit = {    t.toLowerCase.split(" ").foreach(out.collect)  }}
复制代码

之后将原实现修改下面就可以。

    val counts: DataStream[(String, Int)] = text			.flatMap(new MyFlatMapFunction)                  // 实现2:采用创建Class实现Funciton接口                                           // 实现3: 采用创建匿名类实现Funciton接口      .filter(_.nonEmpty)      .map((_, 1))      .keyBy(value => value._1)      .sum(1)
复制代码

2. 通过创建匿名类实现 Funciton 接口

    val counts: DataStream[(String, Int)] = text        .flatMap(new FlatMapFunction[String, String] {          override def flatMap(t : String, out : Collector[String]) : Unit = {            t.toLowerCase.split(" ").foreach(out.collect)          }        })      .filter(_.nonEmpty)      .map((_, 1))      .keyBy(value => value._1)      .sum(1)
复制代码

3. 通过实现 RichFunciton 接口

Flink 中提供了 RichFunction 接口,主要用于比较高级的数据处理场景,RichFunction 接口中有 open、close、getRuntimeContext 和 setRuntimeContext 等方法来获取状态,缓存等系统内部数据。和FlatMapFunction相似,RichFunction 子类中也有RichFlatMapFunction

class MyRichFlatMapFunction extends RichFlatMapFunction[String, String] {  override def flatMap(in: String, out: Collector[String]): Unit = {    in.toLowerCase.split(" ").foreach(out.collect)  }}
复制代码

FlatMapFunction类似,也可以采用匿名函数实现。


分区 Key 指定

在 DataStream 数据经过不同的算子转换过程中,某些算子需要根据指定的 key 进行转换,常见的有joincoGroupgroupBy类算子,需要先将 DataStream 或 DataSet 数据集转换成对应的KeyedStreamGroupedDataSet,主要目的是将相同 key 值的数据路由到相同的 Pipeline 中,然后进行下一步的计算操作。需要注意的是,在 Flink 中这种操作并不是真正意义上将数据集转换成 Key-Value 结构,而是一种虚拟的 key,目的仅仅是帮助后面的基于 Key 的算子使用,分区 Key 可以通过两种方式指定:


1. 根据字段位置指定

在 DataStream API 中通过 keyBy()方法将 DataStream 数据集根据指定的 key 转换成重新分区的 KeyedStream,如以下代码所示,对数据集按照相同 key 进行 sum()聚合操作。

val dataStream : DataStream[(String, Int)] = env.fromElements(("a", 1), ("c", 2))// 根据第一个字段重新分区,然后对第二个字段进行求和运算val result = dataStream.keyBy(0).sum(1)
复制代码

在 DataSet API 中,如果对数据根据某一条件聚合数据,对数据进行聚合时候,也需要对数据进行重新分区。如以下代码所示,使用 DataSet API 对数据集根据第一个字段作为 GroupBy 的 key,然后对第二个字段进行求最大值运算。

val dataSet = env.fromElements(("hello", 1), ("flink", 3))// 根据第一个字段进行数据重分区,求取相同key值下第二个字段的最大值val groupedDataSet:GroupedDataSet[(String,Int)] = dataSet.groupBy(0)groupedDataSet.max(1)
复制代码

2. 根据字段名称指定

KeyBy 和 GroupBy 的 Key 也可以根据字段的名称来指定。使用字段名称需要 DataStream 中的数据结构类型必须是 Tuple 类或者 POJOs 类的。

val personDataSet = env.fromElements(new Persion("Alex", 18), new Persion("Peter", 43))//指定name字段名称来确定groupby字段personDataSet.groupBy("name").max(1)
复制代码

如果程序中使用 Tuple 数据类型,通常情况下字段名称从 1 开始计算,字段位置索引从 0 开始计算,下面实现等价:

val personDataStream = env.fromElements(("Alex", 18),("Peter", 43))//通过名称指定第一个字段名称personDataStream.keyBy("_1")
//通过位置指定第一个字段personDataStream.keyBy(0)
复制代码

如果程序中使用 Tuple 数据类型,通常情况下字段名称从 1 开始计算,字段位置索引从 0 开始


3. 通过 Key 选择器指定

通过定义 Key Selector 来选择数据集中的 Key,如下代码所示,定义 KeySelector,然后复写 getKey 方法,从 Person 对象中获取 name 为指定的 Key。


以 WordCount 的 keyBy 为例:

val counts: DataStream[(String, Int)] = text    .flatMap(_.toLowerCase.split(" "))    .filter(_.nonEmpty)    .map((_, 1))    .keyBy(new KeySelector[(String, Int), String]() {               // 方式3: 通过Key选择器指定        override def getKey(value: (String, Int) ): String = value._1      })    .sum(1)
复制代码


输出结果

Flink DataStreamDataSet接口中定义了基本的数据输出方法,例如

  • 基于文件输出writeAsText()

  • 基于控制台输出print()

  • Flink 在系统中定义了大量的 Connector,方便用户和外部系统交互,用户可以直接通过调用 addSink()添加输出系统定义的 DataSink 类算子,这样就能将数据输出到外部系统。


程序触发

所有的计算逻辑全部操作定义好之后,需要调用 ExecutionEnvironment 的 execute()方法来触发应用程序的执行,其中 execute()方法返回的结果类型为 JobExecutionResult,里面包含了程序执行的时间和累加器等指标。需要注意的是,execute 方法调用会因为应用的类型有所不同,DataStream 流式应用需要显性地指定 execute()方法运行程序,如果不调用则 Flink 流式程序不会执行,但对于 DataSet API 输出算子中已经包含对 execute()方法的调用,则不需要显性调用 execute()方法,否则会出现程序异常。

//调用StreamExecutionEnvironment的execute方法执行流式应用程序env.execute("App Name");
复制代码



参考资料

  1. 张利兵的《Flink 原理、实战与性能优化》

用户头像

正向成长

关注

正向成长 2018.08.06 加入

想要坚定地做大规模数据处理(流数据方向),希望结合结合批处理的传统处理方式,以及之后流批混合处理方向进行学习和记录。

评论

发布
暂无评论
流处理引擎Flink:编程 - 程序结构