Flink 的 DataStream API(v1_7)(五)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
本章介绍了 Flink 的 DataStream API 的基础知识。我们展示了一个标准的 Flink 流应用程序的结构和组件,以及讨论了 Flink 的类型系统和支持的数据类型,并给出了数据操作和分区转换操作。下一章我们将讨论窗口算子操作(windows operator)、基于时间的转换(time-based transformations)、有状态算子(stateful operators)和连接器(connectors)。在阅读本章之后,你将收获如何实现具有基本功能的流处理应用程序。我们的示例代码使用的是比较简洁的 Scala 编程语言,但是 Java API 基本上是类似的。我们还在 GitHub 仓库中提供了基于 Java 和 Scala 实现的完整示例应用程序(https://github.com/streaming-with-flink/)。
Hello,Flink!
让我们从一个简单的例子开始,初步了解使用 DataStream API 编写流应用程序是什么样的体验。 我们将使用此示例以此来认识一个 Flink 程序的基本结构,并介绍 DataStream API 的一些重要特性。 我们的这个示例应用程序会模拟从多个传感器摄取温度测量的数据流。
首先,让我们简单看一下用来表示传感器读数的数据类型:
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double)
示例 5-1 中的程序将传感器测量温度从华氏温度转换为摄氏温度,并计算每个传感器每 5 秒的平均温度。
// Scala object that defines the DataStream program in the main() method.
object AverageSensorReadings {
// main() defines and executes the DataStream program
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// create a DataStream[SensorReading] from a stream source
val sensorData: DataStream[SensorReading] = env
// ingest sensor readings with a SensorSource SourceFunction
.addSource(new SensorSource)
// assign timestamps and watermarks (required for event time)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val avgTemp: DataStream[SensorReading] = sensorData
// convert Fahrenheit to Celsius with an inline lambda function
.map( r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
} )
// organize readings by sensor id
.keyBy(_.id)
// group readings in 5 second tumbling windows
.timeWindow(Time.seconds(5))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager)
// print result stream to standard out
avgTemp.print()
// execute application
env.execute("Compute average sensor temperature")
}
}
或许你已经注意到 Flink 程序是用常规的 Scala 或 Java 方法定义并提交执行的, 最常见的做法是在静态 main()方法中完成逻辑定义。 在我们的示例中,我们定义了一个 AverageSensorReadings 对象,其中 AverageSensorReadings 这个类的 main()方法包含了应用程序大部分的实现逻辑。
一个典型的 Flink 流处理应用程序的结构包括以下几个部分:
1. 设置执行环境。
2. 从数据源读取一个或多个流。
3. 应用流式转换来实现应用程序逻辑。
4. 可以选择将结果输出到一个或者多个数据接收器(data sinks)。
5. 执行程序(run)。
现在我们再详细看看这些部分的内容。
设置执行环境
Flink 应用程序需要做的第一件事就是设置它的执行环境。执行环境决定程序是在本地机器上运行还是在集群上运行。在 DataStream API 中,应用程序的执行环境由 StreamExecutionEnvironment 进行设置。在我们的示例中,我们通过调用 StreamExecutionEnvironment.getExecutionEnvironment()来设置执行环境。此方法返回本地或远程环境,具体取决于调用该方法的上下文。如果从连接到远程集群的客户端调用该方法,则返回远程执行环境,否则返回一个本地环境。
你也可以明确地创建本地或远程执行环境,如下所示:
// create a local stream execution environment
val localEnv: StreamExecutionEnvironment.createLocalEnvironment()
// create a remote stream execution environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"host", // hostname of JobManager
1234, // port of JobManager process
"path/to/jarFile.jar") // JAR file to ship to the JobManager
接着,我们使用 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)来设置我们的程序使用的事件时间语义。执行环境允许设置更多的配置项,比如设置程序并行度和是否开启容错机制配置等。
读取输入流
一旦设置好了执行环境,就可以执行一些实际工作并开始处理流了。StreamExecutionEnvironment 提供了一些方法用于创建数据流,并将数据流注入应用程序形成流的数据源(source)。数据流可以从消息队列、文件等源获取,也可以动态生成。
在我们的例子中,我们使用:
val sensorData: DataStream[SensorReading] =
env.addSource(new SensorSource)
连接到传感器测量的数据源,并创建一个 SensorReading 类型的初始 DataStream。Flink 支持许多种数据类型,我们将在下一节中对此进行描述。这里,我们使用 Scala case 类作为我们之前定义的数据字段和类型。传感器读数包含传感器 id、表示测量时间的时间戳和测量的温度。以下两个方法通过调用 setParallelism(4)将输入数据源配置并行度为 4,并分配时间戳和生成水位线,时间戳和水位线是使用 assignTimestampsAndWatermarks(new SensorTimeAssigner)处理事件所需要用到的。在这里,我们暂时不关注 SensorTimeAssigner 的实现细节,留到后面再做解析。
应用转换(apply transformations)
一旦我们使用到了 DataStream,就可以对其应用转换操作,Flink 提供了不同类型的转换操作。一些转换操作可以生成新的 DataStream(可能是不同类型),值得一提的是转换操作不会修改 DataStream 的记录,而是通过分区或分组对其进行重新处理。应用程序的逻辑由链式转换共同组成的。
在我们的示例中,我们首先应用 map()方法进行转换,它将每个传感器读数的温度转换为摄氏温度。然后,我们使用 keyBy()方法根据传感器 id 对传感器读数进行分区转换。随后,我们还执行一个 timeWindow()转换操作,它将每个传感器 id 分区的传感器数据进行分组,并设置了长度为 5 秒的滚动窗口。
val avgTemp: DataStream[SensorReading] = sensorData
.map( r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
} )
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.apply(new TemperatureAverager)
窗口转换操作在“窗口算子操作”中有详细的描述。最后,我们应用了用户自定义函数( UDF )来计算每个窗口的平均温度。我们将在本章的后一部分讨论用户自定义函数的实现。
输出结果
流应用程序通常将其结果发送到一些外部系统,如 Apache Kafka、文件系统或数据库。Flink 提供了一个维护良好的流接收器( stream sinks )集合,可用于将数据写入不同的系统,也可以实现自己的流接收器。除此之外,还有一些应用程序不输出结果,而是将结果保存在内部,通过 Flink 的可查询状态特性( queryable state feature )提供结果。
在我们的示例中,结果是一个 DataStream[SensorReading],包含每个传感器的每 5 秒平均测量温度。通过调用 print()方法将结果流写入标准输出。
avgTemp.print()
请注意,流式接收器的选择会影响应用程序的端到端一致性,即,应用程序的结果是具有至少一次语义(at-least once semantics )还是具有精确一次语义( exactly-once semantics )。应用程序的端到端一致性取决于所选流接收器与 Flink 检查点算法的集成。我们将在“应用程序一致性保证(Application Consistency Guarantees)”这一章节中更详细地讨论这个主题。
执行
当应用程序被完全定义好之后,可以通过调用 StreamExecutionEnvironment.execute()方法来执行它,通常这是程序的最后一步,我们例子中的最后调用的方法也是如此:
env.execute("Compute average sensor temperature")
Flink 程序是惰性执行的。也就是说,到目前为止,所有创建流源和转换的方法都不会触发任何数据处理。相反,只是在执行环境构建了一个执行计划,包括了从执行环境中创建数据源,并包括应用于这些数据源的所有转换操作,只有在调用 execute()方法时,系统才会触发程序的执行。举个形象的例子:建房子,我们首先会先画出房子的蓝图,等到材料和工人以及器械到位之后,然后才开始建房子。那再这个过程中“蓝图”就相当于 Flink 构建的应用程序逻辑,开始建房子就相当于执行 execute()方法,所以如果最终没有执行 execute()方法,那么 Flink 程序实际上是没有被提交运行的。
构造的计划被转换成 JobGraph 并提交给 JobManager 执行。根据执行环境的类型,JobManager 作为本地线程(本地执行环境)启动,或者 JobGraph 被发送到远程 JobManager。如果 JobManager 是远程运行的,则 JobGraph 必须与一个 JAR 文件一起提供,该 JAR 文件包含应用程序的所有类和所需的依赖项。
转换操作(Transformations)
在本节中,我们将概述 DataStream API 的基本转换。与时间相关的算子操作,如窗口操作和其他特殊的转换操作将在后面的章节中描述。流转换操作可以应用于一个或多个流,并将它们转换为一个或多个输出流。编写 DataStream API 程序本质上可以归结为组合这些转换操作,构建数据流图,实现应用程序逻辑。
大多数流转换操作基于用户自定义的函数。这些函数封装了用户应用程序逻辑,并定义了如何将输入流的元素转换为输出流的元素。函数,如下面的 MapFunction,被定义为实现特定逻辑转换的函数接口的类:
class MyMapFunction extends MapFunction[Int, Int] {
override def map(value: Int): Int = value + 1
}
函数接口定义了需要由用户实现的转换方法,如上面示例中的 map()方法。
大多数函数接口被设计成 SAM(单个抽象方法)接口,可以实现为 Java8 lambda 函数。Scala DataStream API 还内置了对 lambda 函数的支持,在介绍 DataStream API 的转换时,我们展示了所有函数类的接口,但是为了简单起见,我们主要使用 lambda 函数而不是代码示例中的函数类。
DataStream API 提供了最常见的数据转换操作。如果你熟悉批处理 API、函数式编程语言或 SQL,你会发现书中涉及的 API 概念非常容易掌握。我们将 DataStream API 的转换分为四类:
1. 基本转换:对单个事件的转换。
2. KeyedStream 转换:应用于 key 上下文中事件的转换。
3. 多流转换:将多个流合并到一个流中,或将一个流拆分为多个流的转换。
4. 分区转换:重新组织流事件数据的转换。
基本转换(Basic Transformations)
基本转换处理单个事件,这意味着每个输出记录都是从单个输入记录生成的。简单的值转换、记录的分割或记录的过滤都是常见的基本函数。我们会解释它们的语义并给出代码示例。
Map
map 转换是通过调用 DataStream.map()方法来指定的,并生成一个新的 DataStream。它将每个传入事件传递给一个用户自定义的 mapper,该 mapper 仅返回一个输出事件(可能是不同类型的输出事件)。图 5-1 显示了将每个正方形转换成圆形的 map 转换。
MapFunction 的泛型是输入和输出事件的类型,可以使用 MapFunction 接口指定。它定义了 map()方法,将一个输入事件转换成一个输出事件:
// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
> map(T): O
下面是一个简单的 mapper 函数,它提取了输入流中每个“SensorReading”的第一个字段(id):
val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new MyMapFunction)
class MyMapFunction extends MapFunction[SensorReading, String] {
override def map(r: SensorReading): String = r.id
}
如果是使用 Scala API 或 Java8 时,mapper 函数也可以表示为 lambda 函数:
val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(r => r.id)
Filter
filter 转换通过判断每个事件的布尔条件来删除或转发流中的事件。如果返回值为 true,则保留输入事件并将其转发到输出,如果返回值为 false,则过滤掉事件。通过调用 DataStream.filter()方法来指定 filter 转换,并生成与输入 DataStream 类型相同的新 DataStream。图 5-2 显示了只保留白色方块的筛选操作。
布尔条件可以使用 FilterFunction 接口或 lambda 函数作为函数实现。FilterFunction 接口的泛型是输入流的类型,它定义了 filter()方法,该方法通过输入事件调用,并返回一个布尔值:
// T: the type of elements
FilterFunction[T]
> filter(T): Boolean
下面的示例显示了一个过滤器,它将所有温度低于 25 华氏度的传感器测量值都过滤掉了:
val readings: DataStream[SensorReadings] = ...
val filteredSensors = readings
.filter( r => r.temperature >= 25 )
FlatMap
flatMap 转换类似于 map,但它可以为每个传入事件生成零、一个或多个输出事件。实际上,flatMap 转换是 filter 和 map 的泛化,可以用来实现这两个操作。图 5-3 显示了一个基于传入事件的颜色来区分其输出的 flatMap 操作。如果输入是白色方块,则输出事件未修改。黑色方块被复制,灰色方块被过滤掉。
flatMap 转换对每个传入事件应用一个函数。对应的 FlatMapFunction 定义了 flatMap()方法,它可以将 0、1 或多个事件作为结果传递给 Collector 对象:
// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
> flatMap(T, Collector[O]): Unit
这个例子展示了数据处理教程中常见的 flatMap 转换。该函数应用于一个"句子"流,按空格字符进行切割,并将每个产生的单词作为单独的记录发出:
val sentences: DataStream[String] = ...
val words: DataStream[String] = sentences
.flatMap(id => id.split(" "))
KeyedStream 转换(KeyedStream Transformations)
许多应用程序的一个常见需求是处理一组事件,这组事件有共有的属性。DataStream API 提供了 KeyedStream 的抽象,KeyedStream 是一个 DataStream,它在逻辑上被划分为具有相同 key 的事件的子流,按 key 做分组。
在当前处理过的事件的 key 的上下文中,对 KeyedStream 进行读写状态的有状态转换。这意味着具有相同 key 的所有事件访问相同的状态,因此可以一起处理。
注意:
请注意,必须小心使用有状态转换和 key 聚合。如果 key 域持续增长(例如,因为键是唯一的事务 id),则必须清除不再活动的 key 的状态,以避免内存问题。参考“实现有状态函数”,其中详细讨论了有状态函数。
可以使用前面看到的 map、flatMap 和 filter 转换来处理 KeyedStream。接着,我们将使用 keyBy 转换将 DataStream 转换为 KeyedStream,并使用诸如滚动聚合和 reduce 之类的 key 转换。
keyBy
keyBy 转换通过指定 key 将 DataStream 转换为 KeyedStream。基于该 key,流的事件被分配到不同分区,因此,所有具有相同 key 的事件都由相同任务转换操作处理。具有不同 key 值的事件可以由同一任务处理,但任务函数的 key 值状态始终在当前事件的 key 值范围内访问。
将输入事件的颜色作为 key,图 5-4 将黑色事件分配给一个分区,将所有其他事件分配给另一个分区。
keyBy()方法接收一个参数,该参数指定分组的 key(或多个 key)并返回一个 KeyedStream。有不同的方法来指定 key。我们在“定义 key 和引用字段”中介绍了它们。下面的代码声明 id 字段作为传感器读取记录流的 key:
val readings: DataStream[SensorReading] = ...
val keyed: KeyedStream[SensorReading, String] = readings
.keyBy(r => r.id)
lambda 函数 r => r.id,表示提取传感器读数记录 r 的 id 字段。
Rolling aggregations(滚动聚合)
滚动聚合转换应用于 KeyedStream,并生成聚合的数据流,如 sum、minimum 和 maximum。滚动聚合操作为每个 key 保留一个聚合值。对于每个传入事件,操作更新相应的聚合值,并发出带有更新值的事件。滚动聚合不需要用户自定义的函数,但是接收一个参数,该参数指定计算聚合的字段。DataStream API 提供了以下滚动聚合方法:
sum()指定字段上输入流的滚动和。
min()指定字段上输入流的滚动最小值。
max()指定字段上输入流的滚动最大值。
minBy()输入流的滚动最小值,它返回迄今为止观察到的值最小的事件。
maxBy()输入流的滚动最大值,它返回迄今为止观察到的值最大的事件。
不可以同时组合多个滚动聚合方法,一次只能计算一个滚动聚合函数。
参考下面的示例,在第一个字段作为 key 的 Tuple3[Int,Int, Int]流,基于第二个字段滚动求和:
val inputStream: DataStream[(Int, Int, Int)] = env.fromElements(
(1, 2, 2), (2, 3, 1), (2, 2, 4), (1, 5, 3))
val resultStream: DataStream[(Int, Int, Int)] = inputStream
.keyBy(0) // key on first field of the tuple
.sum(1) // sum the second field of the tuple in place
在本例中,元组输入流由第一个字段 key,滚动和由第二个字段计算。示例的输出是(1、2、2),然后是(1、7、2),键值为“1”,然后是(2、3、1),然后是(2、5、1),键值为“2”。“第一个字段是 key,第二个字段是和,第三个字段没有定义。
只在有界的 key 范围上使用滚动聚合
滚动聚合操作为处理的每个 key 保持一个状态。由于这种状态永远不会删除,所以你应该只对具有有界 key 范围的流,应用滚动聚合函数。
Reduce
reduce 变换是滚动聚合的泛化。它在 KeyedStream 上应用 ReduceFunction,它将每个传入事件与当前的 reduce 值组合起来,并生成一个 DataStream。reduce 转换不会改变流的类型。输出流的类型与输入流的类型相同。
可以使用实现 ReduceFunction 接口的类来指定该函数。ReduceFunction 定义了 reduce()方法,它接受两个输入事件并返回一个相同类型的事件:
// T: the element type
ReduceFunction[T]
> reduce(T, T): T
在下面的例子中,流的 key 为语言,输出结果是每种语言的单词列表:
val inputStream: DataStream[(String, List[String])] =
env.fromElements(("en", List("tea")), ("fr", List("vin")), ("en",List("cake")))
val resultStream: DataStream[(String, List[String])] =
inputStream
.keyBy(0)
.reduce((x, y) => (x._1, x._2 ::: y._2))
lambda reduce 函数传递第一个元组字段(key 字段)并连接第二个元组字段的 List[String]值。
只在有界的 key 范围上使用 ROLLING REDUCE
ROLLING REDUCE 操作为每个被处理的 key 保持一个状态。由于这种状态永远不会被清除,所以你应该只对具有有界 key 范围的流,应用一个滚动 reduce 操作。
多流转换(multiStream Transformations)
许多应用程序需要读取多个流,并且需要对这些流进行关联处理,或者分割成多个流,以便将不同的逻辑应用于不同的子流。我们将讨论处理多个输入流或产生多个输出流的 DataStream API 转换。
Union
union()方法合并两个或多个相同类型的 DataStream,并产生一个类型相同的新 DataStream。随后对所有输入流的元素进行转换处理。图 5-5 显示了将黑色和灰色事件合并到单个输出流中的 union 操作。
事件以 FIFO 方式合并,不会按照特定的事件顺序去生成。而且,union 操作不执行重复消除。每个输入事件都被发送给下一个操作。
下面展示了如何将“SensorReading”类型的三个流合并成一个流:
val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
.union(tokyoStream, rioStream)
Connect, coMap, and coFlatMap
合并两个流的事件是流处理中非常常见的需求。假设这样一个应用程序,它监控着森林区域的动态,并在有火灾高风险时输出警报。应用程序接收你之前看到的温度传感器读数流和另外的烟雾浓度测量流。当温度超过给定的阈值且烟雾浓度很高时,应用程序发出火灾警报。
DataStream API 提供了连接转换来支持这样的用例。DataStream.connect()方法接收一个 DataStream 并返回一个 ConnectedStreams 对象,该对象表示两个连接的流:
// first stream
val first: DataStream[Int] = ...
// second stream
val second: DataStream[String] = ...
// connect streams
val connected: ConnectedStreams[Int, String] = first.connect(second)
ConnectedStreams 对象提供了 map()和 flatMap()方法,它们分别将 CoMapFunction 和 CoFlatMapFunction 作为参数。你还可以将 CoProcessFunction 应用于 ConnectedStreams。我们将在后面的章节中讨论 CoProcessFunction。
这两个函数的类型由第一个和第二个输入流类型,以及输出流类型确定,并定义了两个方法,每个方法对应一个输入。调用 map1()和 flatMap1()处理第一个输入的事件,调用 map2()和 flatMap2()处理第二个输入的事件:
// IN1: the type of the first input stream
// IN2: the type of the second input stream
// OUT: the type of the output elements
CoMapFunction[IN1, IN2, OUT]
> map1(IN1): OUT
> map2(IN2): OUT
// IN1: the type of the first input stream
// IN2: the type of the second input stream
// OUT: the type of the output elements
CoFlatMapFunction[IN1, IN2, OUT]
> flatMap1(IN1, Collector[OUT]): Unit
> flatMap2(IN2, Collector[OUT]): Unit
函数不能选择要读取哪个 CONNECTEDSTREAMS
无法控制 CoMapFunction 或 CoFlatMapFunction 这两个方法调用的顺序。相反,只要事件通过相应的输入到达,就会立即调用方法。
两个流的联合处理通常需要两个流的事件根据某个条件来确定路由,由相同操作的并行实例进行处理。默认情况下,connect()不会在两个流的事件之间建立关系,因此这两个流的事件被随机分配给操作实例。这种行为产生不确定的结果,通常是不受欢迎的。为了在 ConnectedStreams 上实现确定性转换,可以将 connect()与 keyBy()或 broadcast()组合使用。我们首先演示 keyBy()案例:
val one: DataStream[(Int, Long)] = ...
val two: DataStream[(Int, String)] = ...
// keyBy two connected streams
val keyedConnect1: ConnectedStreams[(Int, Long), (Int, String)] = one
.connect(two)
.keyBy(0, 0) // key both input streams on first attribute
// alternative: connect two keyed streams
val keyedConnect2: ConnectedStreams[(Int, Long), (Int, String)] =
one.keyBy(0)
.connect(two.keyBy(0))
无论你是 keyBy() ConnectedStreams 还是 connect()两个 KeyedStreams, connect()转换都将使用相同的 key 将来自这两个流的所有事件路由到相同的操作的实例上。注意,这两个流的 key 应该引用相同的实体类,就像 SQL 查询中的连接谓词一样。应用于已连接的 key 类型流的操作可以访问 key 状态。
下一个例子展示了如何连接(非 key)数据流与广播流:
val first: DataStream[(Int, Long)] = ...
val second: DataStream[(Int, String)] = ...
// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
// broadcast second input stream
.connect(second.broadcast())
将广播流的所有事件复制并发送给后续处理函数的所有并行操作实例。非广播流的事件只是简单地转发。因此,可以关联处理两个输入流的元素。
注意
可以使用广播状态连接 key 类型流和广播流。Broadcast state 是 Broadcast ()-connect()转换的改进版本。它还支持连接 key 类型流和广播流,并将广播事件存储在托管状态。这允许你实现通过数据流动态配置的操作(例如,添加或删除过滤规则或更新机器学习模型)。在“使用连接广播状态”中详细讨论了广播状态。
Split and select
分割是关联转换的逆转换。它将输入流划分为与输入流相同类型的两个或多个输出流。可以将每个传入事件路由到零、一个或多个输出流。因此,split 还可以用于过滤或复制事件。图 5-6 显示了一个 split 操作,它将所有白色事件路由到一个单独的流中。
split()方法接收一个 OutputSelector,该选择器定义如何将流元素分配给指定的输出。OutputSelector 定义了为每个输入事件调用的 select()方法,并返回 java.lang.Iterable[String]。为记录返回的字符串值记录路由到指定的输出流。
// IN: the type of the split elements
OutputSelector[IN]
> select(IN): Iterable[String]
DataStream.split()方法返回一个 SplitStream,它提供一个 select()方法,通过指定输出名称从 SplitStream 中选择一个或多个流。
5 - 2 示例 将一个 Tuple 数据流拆分为具有较大数字的流和具有较小数字的流。
val inputStream: DataStream[(Int, String)] = ...
val splitted: SplitStream[(Int, String)] = inputStream
.split(t => if (t._1 > 1000) Seq("large") else Seq("small"))
val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large")
注意
分割转换的一个限制是所有输出流的类型都必须与输入类型相同。在“发送到边输出”中,我们给出了处理函数的边输出特性,它可以从一个函数发出多个不同类型的流。
分区转换(Distribution Transformations)
分区转换对应于我们在“数据交换策略”中介绍的数据交换策略。这些操作定义如何将事件分配给任务。在使用 DataStream API 构建应用程序时,系统会根据操作语义和设置的并行度自动选择数据分区策略并将数据路由到正确的目标。有时,根据应用程序的优先级采用不同的分区策略或者自定义分区器是有必要的或可取的。例如,如果我们知道 DataStream 的并行分区的负载是倾斜的,那么我们可能希望重新平衡数据,以便均匀地分配后续操作的计算负载。或者,应用程序逻辑可能要求操作的所有任务接收相同的数据,或者要求按照自定义策略分发事件。在本节中,我们将介绍 DataStream 方法,这些方法使用户能够控制分区策略或定义自己的分区策略。
注意
注意,keyBy()不同于本节中讨论的分区转换。本节中的所有转换都产生一个 DataStream,而 keyBy()则产生一个 KeyedStream,可以在这个 KeyedStream 上应用具有 key 状态访问权的转换。
Random(随机)
随机数据交换策略由 DataStream.shuffle()方法实现。该方法按照均匀分布将记录随机分配给后续操作的并行任务。
Round-Robin(轮询)
rebalance()方法对输入流进行分区,以便以轮询方式将事件均匀地分配给后续任务。图 5-7 说明了轮询转换。
Rescale(并行度扩展)
rescale()方法以循环方式分发事件,但只分发给后续任务的一个子集。实际上,rescale 分区策略提供了一种方法,可以在发送方和接收方任务数量不同时执行轻量级负载再平衡。如果接收方任务的数量是发送方任务数量的倍数,或者相反,则 rescale 转换更有效。
rebalance()和 rescale()的根本区别在于任务连接的形成方式。rebalance()会在所有发送任务与所有接收任务之间创建通信通道,而 rescale()则只创建从每个任务到下游操作的某些任务的通道。rescale 分布转换的连接模式如图 5-7 所示。
Broadcast
broadcast()方法复制输入数据流,以便将所有事件广播给下游操作的所有并行任务。
Global
global()方法将输入数据流的所有事件发送到下游操作的第一个并行任务。必须谨慎使用这种分区策略,因为将所有事件路由到同一任务可能会影响应用程序性能。
Custom
当预定义的分区策略都不合适时,可以使用 partitionCustom()方法定义自己的分区策略。此方法接收一个 Partitioner 对象,该对象实现分区逻辑和要分区流的字段或键位置。下面的例子分割了一个整数流,这样所有的负数都被发送到第一个任务,所有其他的数都被发送到一个随机任务:
val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)
object myPartitioner extends Partitioner[Int] {
val r = scala.util.Random
override def partition(key: Int, numPartitions: Int): Int = {
if (key < 0) 0 else r.nextInt(numPartitions)
}
}
设置并行度
Flink 应用程序在分布式环境(如计算机集群)中并行执行。当将 DataStream 程序提交给 JobManager 执行时,系统将创建一个数据流图,并为执行操作做好准备。每个操作被并行化为一个或多个任务。每个任务将处理操作输入流的一个子集。操作的并行任务数称为操作的并行度。它决定了操作的处理任务的数量,以及可以处理多少数据。
操作的并行度可以在执行环境级别或每个算子操作级别控制。默认情况下,应用程序的所有操作的并行度设置为应用程序执行环境的并行度。环境的并行度(以及所有操作的默认并行度)是基于应用程序启动的上下文自动初始化的。如果应用程序在本地执行环境中运行,则将并行度设置为与 CPU 内核的数量匹配。在将应用程序提交到正在运行的 Flink 集群时,除非通过提交客户端明确地指定了环境并行度,否则将环境并行度设置为集群的默认并行度(有关详细信息,请参阅“运行和管理流应用程序”)。
通常,定义操作的并行度相对于环境的默认并行度是一个好主意,这使你可以通过提交客户端调整应用程序的并行度,从而轻松地扩展应用程序,你可以得到环境的默认并行度,如下例所示:
val env: StreamExecutionEnvironment.getExecutionEnvironment
// get default parallelism as configured in the cluster config or
// explicitly specified via the submission client.
val defaultP = env.env.getParallelism
你也可以覆盖环境的默认并行度,此时无法通过提交客户端控制你的应用程序的并行度(代码设置的并行度优先级最高):
val env: StreamExecutionEnvironment.getExecutionEnvironment
// set parallelism of the environment
env.setParallelism(32)
操作的默认并行度可以通过显式指定来覆盖。在下面的例子中,源操作将以环境的默认并行度执行,map 转换的任务数是源操作的两倍,而 sink 操作总是由 2 个并行任务执行:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get default parallelism
val defaultP = env.getParallelism
// the source runs with the default parallelism
val result: = env.addSource(new CustomSource)
// the map parallelism is set to double the default parallelism
.map(new MyMapper).setParallelism(defaultP * 2)
// the print sink parallelism is fixed to 2
.print().setParallelism(2)
当你通过提交客户端提交应用程序并指定并行度为 16 时,source 程序将以并行度为 16 运行,mapper 程序将以 32 个任务运行,sink 将以 2 个任务运行。如果你在本地环境中运行应用程序—或者示例,从你的 IDE 上运行一台有 8 个核心的机器,source 程序将运行 8 个任务,mapper 将运行 16 个任务,sink 将运行 2 个任务。
类型
Flink DataStream 应用程序处理表示为数据对象的事件流。调用的 DataSteam 函数接收数据对象进行处理,并输出数据对象。在内部,Flink 需要能够处理这些对象。需要对它们进行序列化和反序列化,以便通过网络进行数据传输,或者将它们写入状态后端、检查点和保存点,或从状态后端、检查点和保存点进行读取。为了有效地做到这一点,Flink 需要应用程序处理的数据类型的详细信息。Flink 使用类型信息的概念来表示数据类型,并为每种数据类型生成特定的序列化器、反序列化器和比较器。
Flink 还具有一个类型提取系统,它可以分析函数的输入和返回类型,以自动获得类型信息,从而获得序列化器和反序列化器。然而,在某些情况下,例如 lambda 函数或泛型类型,需要显式地提供类型信息以使应用程序工作或提高其性能。
在本节中,我们将讨论 Flink 支持的类型,如何为数据类型创建类型信息,以及如果 Flink 的类型系统不能自动推断函数的返回类型,如何使用提示来帮助它。
支持的数据类型
Flink 支持 Java 和 Scala 中可用的所有常见数据类型。最广泛使用的类型可分为以下几类:
基本类型
Java 和 Scala 元组
Scala case 类
pojo,包括 Apache Avro 生成的类
一些特殊的类型
没有经过特殊处理的类型被视为泛型类型,并使用 Kryo 序列化框架进行序列化。
只使用 KRYO 作为备用解决方案
注意,如果可能,应该避免使用 Kryo。因为 Kryo 是一种通用的序列化器,所以它通常不是很有效。Flink 提供配置选项,通过预先将类注册到 Kryo 来提高效率。而且,Kryo 没有提供一个好的迁移路径来演化数据类型。
让我们看看每种数据类型。
基本类型
支持所有 Java 和 Scala 基本类型,如 Int(或 Java 的整数)、String 和 Double。下面是一个处理 Long 值数据流并递增每个元素的例子:
val numbers: DataStream[Long] = env.fromElements(1L, 2L,3L, 4L)
numbers.map( n => n + 1)
Java 和 Scala 元组
元组是由固定数量的类型化字段组成的复合数据类型。
Scala DataStream API 使用常规的 Scala 元组。下面的示例过滤一个包含两个字段的元组数据流:
// DataStream of Tuple2[String, Integer] for Person(name, age)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),("Sarah", 23))
// filter for persons of age > 18
persons.filter(p => p._2 > 18)
Flink 提供了 Java 元组的有效实现。Flink 的 Java 元组最多可以有 25 个字段,每个字段的长度作为一个单独的类实现----tuple1、Tuple2、... 、Tuple25。元组类是强类型的。
我们可以在 Java DataStream API 中重写过滤示例,如下:
// DataStream of Tuple2<String, Integer> for Person(name, age)
DataStream<Tuple2<String, Integer>> persons = env.fromElements(
Tuple2.of("Adam", 17),
Tuple2.of("Sarah", 23));
// filter for persons of age > 18
persons.filter(p -> p.f1 > 18);
Tuple 字段可以通过其 public 字段的名称(如前面所示,f0、f1、f2 等)或使用 getField(int pos)(其中索引从 0 开始)进行访问:
Tuple2<String, Integer> personTuple = Tuple2.of("Alex", "42");
Integer age = personTuple.getField(1); // age = 42
与 Scala 相比,Flink 的 Java 元组是可变的,因此可以重新分配字段的值。函数可以重用 Java 元组,以减少垃圾收集器的压力。下面的示例展示了如何更新 Java 元组的字段:
personTuple.f1 = 42; // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43
Scala case classes
Flink 支持 Scala case classes。Case class 字段是按名称访问的。在下面,我们定义了一个 case 类 Person:,包括 name 和 age 两个字段。至于元组,我们根据 age 过滤数据流:
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23))
// filter for persons with age > 18
persons.filter(p => p.age > 18)
POJOs
Flink 分析不属于上述分类的类型时,会检查是否可以将其标识为 POJO 类型并进行处理。Flink 接受一个类作为 POJO,如果它满足以下条件:
这是一个 public 类。
它有一个没有任何参数的 public 构造函数——默认构造函数。
所有字段都是 public 的,或者可以通过 getter 和 setter 访问。getter 和 setter 函数必须遵循默认的命名方案,即 Y 类型的字段 x,对应 Y getX()和 setX(Y x)。
所有字段类型都具有 Flink 支持的类型。
例如,下列 Java 类将被 Flink 标识为 POJO。
public class Person {
// both fields are public
public String name;
public int age;
// default constructor is present
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
avro 生成的类由 Flink 自动识别并处理为 pojo。
Arrays, Lists, Maps, Enums,以及其他特殊类型
Flink 支持多种特殊用途的类型,如基本数组类型和对象数组类型;Java 的 ArrayList、HashMap 和 Enum 类型;以及 Hadoop Writable 类型。此外,它还提供了有关 Scala 的 Either、Option 和 Try 类型的信息,以及 Either 类型的 Flink Java 版本。
为数据类型创建类型信息
Flink 类型系统的中心类是 TypeInformation。它为系统提供了生成序列化器和比较器所需的必要信息。例如,当你通过某个 key join 或 group 时,TypeInformation 允许 Flink 执行语义检查,检查作为 key 使用的字段是否有效。
当应用程序被提交执行时,Flink 的类型系统尝试为 Flink 框架处理的每个数据类型,自动派生 TypeInformation。类型提取器分析所有函数的泛型类型和返回类型,以获得相应的 TypeInformation 对象。因此,你可以暂时使用 Flink,而不必担心数据类型的 TypeInformation。然而,有时类型提取器会失败,或者你可能希望定义自己的类型并告诉 Flink 如何有效地处理它们。在这种情况下,需要为特定的数据类型生成 TypeInformation。
Flink 使用静态方法为 Java 和 Scala 提供了两个实用程序类来生成 TypeInformation。对于 Java, helper 类是 org.apache.flink.api.common.typeinfo.Types,如下例所示:
// TypeInformation for primitive types
TypeInformation<Integer> intType = Types.INT;
// TypeInformation for Java Tuples
TypeInformation<Tuple2<Long, String>> tupleType =
Types.TUPLE(Types.LONG, Types.STRING);
// TypeInformation for POJOs
TypeInformation<Person> personType =
Types.POJO(Person.class);
Scala API 的 TypeInformation 的 helper 类是 org.apache.flink.api.scala.typeutils.Types,它的使用如下所示:
// TypeInformation for primitive types
val stringType: TypeInformation[String] = Types.STRING
// TypeInformation for Scala Tuples
val tupleType: TypeInformation[(Int, Long)] = Types.TUPLE[(Int, Long)]
// TypeInformation for case classes
val caseClassType: TypeInformation[Person] = Types.CASE_CLASS[Person]
SCALA API 中的类型信息
在 Scala API 中,Flink 使用 Scala 编译器宏在编译时为所有数据类型生成 TypeInformation 对象。要访问 createTypeInformation 宏函数,请确保始终将以下 import 语句添加到你的 Scala 应用程序中:
import org.apache.flink.streaming.api.scala._
显式提供类型信息
在大多数情况下,Flink 可以自动推断类型并生成正确的 TypeInformation。Flink 的类型提取器利用反射并分析函数签名和子类信息,以获得用户自定义函数的正确输出类型。但是,有时无法提取必要的信息(例如,因为 Java 会擦除泛型类型信息)。而且,在某些情况下,Flink 可能不会选择生成最有效的序列化器和反序列化器的 TypeInformation。因此,你可能需要显式地为 Flink 应用程序中使用的某些数据类型提供 TypeInformation 对象。
提供 TypeInformation 有两种方法。首先,可以通过实现 ResultTypeQueryable 接口来扩展函数类,显式地提供其返回类型的 TypeInformation。下面的示例显示了一个提供返回类型的 MapFunction:
class Tuple2ToPersonMapper extends MapFunction[(String, Int), Person]
with ResultTypeQueryable[Person] {
override def map(v: (String, Int)): Person = Person(v._1, v._2)
// provide the TypeInformation for the output data type
override def getProducedType: TypeInformation[Person] =
Types.CASE_CLASS[Person]
}
在 Java DataStream API 中,你还可以使用 returns()方法在定义数据流时显式地指定操作的返回类型,如下所示:
DataStream<Tuple2<String, Integer>> tuples = ...
DataStream<Person> persons = tuples
.map(t -> new Person(t.f0, t.f1))
// provide TypeInformation for the map lambda function's
return type.returns(Types.POJO(Person.class));
定义 key 和引用字段
在前一节中你看到的一些转换需要规范输入流的类型或字段引用。在 Flink 中,key 不像在使用键值对的系统中那样需要在输入类型中预定义。相反,key 被定义为输入数据上的函数。因此,没有必要定义数据类型来保存键和值,这避免了大量的样板代码。
在下面,我们将讨论引用字段和定义数据类型上的 key 的不同方法。
字段位置
如果数据类型是 tuple,则可以通过简单地使用相应 tuple 元素的字段位置来定义 key。下面的示例按输入元组的第二个字段定义输入流的 key:
val input: DataStream[(Int, String, Long)] = ...
val keyed = input.keyBy(1)
还可以定义由多个元组字段组成的组合键。在本例中,位置以列表的形式提供,一个接一个。我们可以使用第二个和第三个字段定义输入流的 key,如下:
val keyed2 = input.keyBy(1, 2)
字段表达式
定义 key 和选择字段的另一种方法是使用基于字符串的字段表达式。字段表达式适用于元组、POJO 和 case 类。它们还支持选择嵌套字段。在本章的介绍示例中,我们定义了以下 case 类:
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double)
要使用传感器 ID 作为数据流的 key,我们可以将字段名 ID 传递给 keyBy()函数:
val sensorStream: DataStream[SensorReading] = ...
val keyedSensors = sensorStream.keyBy("id")
POJO 或 case 类字段是通过它们的字段名来选择的,就像上面的例子一样。元组字段通过它们的字段名(Scala 元组使用 1-offset, Java 元组使用 0-offset)或它们的 0-offset 字段索引来引用:
val input: DataStream[(Int, String, Long)] = ...
val keyed1 = input.keyBy("2") // key by 3rd field
val keyed2 = input.keyBy("_1") // key by 1st field
DataStream<Tuple3<Integer, String, Long>> javaInput = ...
javaInput.keyBy("f2") // key Java tuple by 3rd field
POJO 和元组中的嵌套字段选择,使用点号(. )标识嵌套层次。有以下 case 类:
case class Address(
address: String,
zip: String
country: String)
case class Person(
name: String,
birthday: (Int, Int, Int), // year, month, day
address: Address)
如果我们想引用一个人的邮政编码(zip),我们可以使用一个字段表达式:
val persons: DataStream[Person] = ...
persons.keyBy("address.zip") // key by nested POJO field
也可以在混合类型上嵌套表达式。下面的表达式访问嵌套在 POJO 中的元组字段:
persons.keyBy("birthday._1") // key by field of nested tuple
可以使用通配符字段表达式“_”(下划线字符),匹配完整的数据类型:
persons.keyBy("birthday._") // key by all fields of nested tuple
key 选择器
指定 key 的第三个选项是 KeySelector 函数。KeySelector 函数从输入事件中提取一个 key:
// T: the type of input elements
// KEY: the type of the key
KeySelector[IN, KEY]
> getKey(IN): KEY
下面这个例子实际上在 keyBy()方法中使用了一个简单的 KeySelector 函数:
val sensorData: DataStream[SensorReading] = ...
val byId: KeyedStream[SensorReading, String] = sensorData.keyBy(r => r.id)
KeySelector 函数接收输入项并返回 key。key 不一定是输入事件的字段,也可以通过任意计算得到。在下面的代码中,KeySelector 函数返回元组字段的最大值作为 key:
val input : DataStream[(Int, Int)] = ...
val keyedStream = input.keyBy(value => math.max(value._1,value._2))
与字段位置和字段表达式相比,KeySelector 函数的一个优点是由于 KeySelector 类的泛型类型,所以生成的 key 是强类型的。
实现函数
到目前为止,你已经在本章的代码示例中看到了用户自定义函数的作用。在本节中,我们将更详细地解释在 DataStream API 中定义和参数化函数的多种方法。
函数类
Flink 将用户自定义函数(如 MapFunction、FilterFunction 和 ProcessFunction)的所有接口公开为接口或抽象类。
函数是通过实现接口或扩展抽象类来实现的。在下面的例子中,我们实现了一个 FilterFunction,它对包含单词“flink”的字符串进行过滤:
class FlinkFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
然后,函数类的一个实例可以作为参数传递给 filter 转换:
val flinkTweets = tweets.filter(new FlinkFilter)
函数也可以实现为匿名类:
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
})
函数可以通过它们的构造函数接收参数。我们可以参数化上面的例子,并将字符串“flink”作为参数传递给 KeywordFilter 构造函数,如下所示:
val tweets: DataStream[String] = ???
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
当一个程序被提交执行时,所有的函数对象都使用 Java 序列化进行序列化,并传送到其相应操作的所有并行任务中。因此,在反序列化对象之后,所有配置值都将保留。
函数必须是可 JAVA 序列化的
Flink 使用 Java 序列化序列化所有函数对象,将它们发送到 worker 进程。用户函数中包含的所有内容都必须是可序列化的。如果你的函数需要一个非序列化对象实例,你可以将其实现为一个富函数,并在 open()方法中初始化非序列化字段,或者覆盖 Java 序列化和反序列化方法。
Lambda 函数
大多数 DataStream API 方法接受 lambda 函数。Lambda 函数可用于 Scala 和 Java,当不需要访问状态和配置等高级操作时,它提供了一种简单而简洁的方式来实现应用程序逻辑。下面的例子展示了一个 lambda 函数,它过滤包含单词“flink”的 tweet:
val tweets: DataStream[String] = ...
// a filter lambda function that checks if tweets contains the word "flink"
val flinkTweets = tweets.filter(_.contains("flink"))
富函数(Rich Functions)
通常需要在处理第一条数据记录之前初始化一个函数,或者检索关于执行它的上下文的信息。DataStream API 提供了 Rich Function,这类函数公开的功能比目前讨论的常规函数更多。
所有 DataStream API 转换函数都有 Rich 函数版本,你可以在使用常规函数或 lambda 函数的地方使用它们。Rich 函数可以参数化,就像普通的函数类一样。Rich 函数的名称以 rich 开头,然后是转换名称----RichMapFunction、RichFlatMapFunction 等等。
当使用一个 Rich 函数时,你可以实现两个附加的方法到函数的生命周期:
l open()方法是富函数的初始化方法。在调用 filter 或 map 之类的转换方法之前,对每个任务调用一次。open()通常用于只需要完成一次的设置工作。请注意,Configuration 参数仅用于 DataSet API,而不用于 DataStream API。因此,它应该被忽略。
l close()方法是函数的终结方法,它在转换方法的最后一次调用之后,针对每个任务调用一次。因此,它通常用于清理和释放资源。
另外,getRuntimeContext()方法提供对函数的 RuntimeContext 的访问。RuntimeContext 可用于检索诸如函数的并行度、子任务索引和执行该函数的任务名称等信息。此外,它还包括获取分区状态的方法。在“实现有状态函数”中详细讨论了 Flink 中的有状态流处理。下面的示例代码展示了如何使用 RichFlatMapFunction 的方法。例 5-3 展示了 RichFLatMapFunction 的方法
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// do some initialization
// e.g., establish a connection to an external system
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
// subtasks are 0-indexed
if(in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
// do some more processing
}
override def close(): Unit = {
// do some cleanup, e.g., close connections to external systems
}
}
包括外部和 Flink 依赖项
在实现 Flink 应用程序时,添加外部依赖项是一个常见的需求。有许多流行的库,例如 Apache Commons 或谷歌 Guava,用于不同的场景。此外,大多数 Flink 应用程序依赖于一个或多个从外部系统(如 Apache Kafka、文件系统或 Apache Cassandra)获取数据或向外部系统发送数据的 Flink 连接器。有些应用程序还利用了 Flink 的特定领域的库,如表 API、SQL 或 CEP 库。因此,大多数 Flink 应用程序不仅依赖于 Flink 的 DataStream API 依赖项和 Java SDK,而且还依赖于额外的第三方和 Flink 内部依赖项。
当应用程序执行时,它的所有依赖项必须对应用程序可用。默认情况下,只有核心 API 依赖项(DataStream 和 DataSet API)由 Flink 集群加载。应用程序需要的所有其他依赖项必须显式提供。
这样做的原因是为了保持默认依赖项的数量较低。大多数连接器和库依赖于一个或多个库,这些库通常有几个附加的传递依赖项。通常,这包括经常使用的库,如 Apache Commons 或谷歌的 Guava。许多问题源于同一库的不同版本之间的不兼容性,这些不兼容性来自不同的连接器或直接来自用户应用程序。
有两种方法可以确保应用程序在执行时可以使用所有依赖项:
1. 将所有依赖项打包到应用程序 JAR 文件中。这将产生一个自包含的、但通常相当大的应用程序 JAR 文件。
2. 可以将依赖项的 JAR 文件添加到 Flink 设置的./lib 文件夹中。在这种情况下,当 Flink 进程启动时,依赖项被加载到类路径中。像这样添加到类路径的依赖项对在 Flink 设置上运行的所有应用程序都是可用的。
构建一个所谓的胖 JAR 文件是处理应用程序依赖项的首选方法。我们在“一个构建 Flink Maven 项目”中介绍的 Flink Maven 原型生成 Maven 项目,这些项目被配置为生成包含所有所需依赖项的应用程序胖 jar。默认情况下,包含在 Flink 进程的类路径中的依赖项将自动排除在 JAR 文件之外。生成的 Maven 项目的 pom.xml 文件包含解释如何添加附加依赖项的注释。
总结
在本章中,我们介绍了 Flink 的 DataStream API 的基础知识,还研究了 Flink 程序的结构,并学习了如何结合数据和分区转换来构建流应用程序,此外我们还研究了 Flink 支持的数据类型以及通过不同的方式指定 key 和使用用户自定义函数的方法。如果你回头再读一遍以上的样例,你会对 Flink 的 DataStream API 有一个更好的理解。在第 6 章中,我们会学一些更有意思的内容,我们将学习如何使用窗口算子操作和时间语义来丰富我们的程序。
1. Flink 提供了基于时间的进行流之间关联的算子操作,我们将在第六章中进行详细讨论,本章讨论的是一些更加通用的连接转化操作和函数。
你还可以应用 CoProcessFunction 到 ConnectedStreams 上,同样我们也会在第六章讨论 CoProcessFunction。
有关键控状态(keyed state)的详细信息,将在第八章展开讲解。
Flink 还旨在将自己的外部依赖尽量控制到最小,并在用户应用程序中隐藏其中的大部分依赖(包括传递的依赖),以防止版本冲突。
版权声明: 本文为 InfoQ 作者【数据与智能】的原创文章。
原文链接:【http://xie.infoq.cn/article/21c24d51da0241c2213590f61】。文章转载请联系作者。
评论