写点什么

与外部系统的读写交互 (八)

发布于: 2 小时前
与外部系统的读写交互(八)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


数据可以存储在许多不同形式的系统中,比如文件系统、对象存储、关系数据库系统、键值存储、搜索引擎、事件日志、消息队列等等。不同形式的系统为不同的访问模式设计的,适用于不同的特定场景。因此,今天的数据基础设施通常由多种多样的存储系统组成。在往架构中添加新组件之前,我们应该考虑一个问题:“新组件加入之后与原有组件的协作情况会有怎样的影响”

引入数据处理引擎之前要做好选型,以 Apache Flink 为例,因为它不包括存储层,而是依赖于外部存储系统来摄取和持久化数据。因此,对于像 Flink 这样的数据处理引擎来说,为了方便于外部系统进行数据读写交互,需要提供一个完备的连接器库以及实现自定义连接器的 API,这两者是非常重要的。但是,对于一个希望在发生故障时能够实现一致性保证的流处理器来说,仅仅能够向外部数据存储中读取或写入数据是不够的。

在本章中,我们将讨论源和接收端连接器如何影响 Flink 流应用程序的一致性保证,并介绍 Flink 用于读写数据的最常见的连接器。你将了解如何实现自定义源和接收端连接器,以及如何实现向外部数据存储发送异步读写请求的方法。

应用的一致性保障 

在“检查点、保存点和状态恢复”章节中,你学到了 Flink 的检查点和定期接受应用程序状态的一致检查点的恢复机制。如果出现故障,应用程序的状态将从最新完成的检查点恢复并继续处理数据。然而,能够将应用的状态重置为一致点并不足以实现对应用的一致性管理保证。相反,应用程序的源和接收端连接器需要与 Flink 的检查点和恢复机制集成,并提供某些属性,以便能够提供有意义的保证。

为了给应用程序提供精确一次的状态一致性保证,应用程序的每个源连接器都需要能够将其读取位置设置为以前的检查点位置。在采取检查点时,source operator 将保存其读取位置,并在故障恢复期间基于这些位置进行恢复。支持读取位置检查点的源连接器的示例是基于文件的源,这些源将读取 offset 存储在文件的字节流中,或者在 Kafka 源中,即分区主题。如果应用程序从源连接器获取数据,但无法存储和重置读取位置,则在发生故障时,应用程序可能会出现数据丢失,并且只能提供最多一次的一致性保证。

Flink 的检查点和恢复机制以及可重新设置的源连接器这几个机制组合起来能够保证应用程序不会丢失任何数据。但是,应用程序可能会两次输出结果,因为在最后一个成功的检查点之后发出的所有结果(在恢复的情况下,应用程序退回到该检查点)将再次发出。因此,可重新设置的源和 Flink 的恢复机制不足以提供端到端的精确一次保证,即使应用程序状态是精确一次的。

一个旨在提供端到端精确一次保证的应用程序需要特殊的接收器连接器,有两种技术可以应用于不同的情况以实现精确一次的一致性保证:幂等写和事务性写。

幂等性写 

幂等运算可以执行多次,但是最终的结果是一样的。例如,重复地将相同的键值对插入到 hashmap 中是幂等操作,因为第一个插入操作将键值添加到 map 中,并且所有后续插入操作都不会更改 map,因为它已经包含了键值对。另一方面,追加操作不是幂等操作,因为多次追加一个元素会导致结果多次变化。幂等写操作对于流应用程序来说很有趣,因为它们可以多次执行而不改变结果。因此,它们可以在一定程度上减轻由 Flink 的检查点机制引起的重放结果的影响。

值得注意的是,依赖幂等性的接收器来实现精确一次的应用程序必须保证重放时覆盖以前写的结果。例如,如果应用程序有一个接收器,它要将数据更新到键值存储中,则必须确保它能准确地计算用于更新的键值。此外,从 sink 系统读取数据的应用程序可能会在应用程序恢复期间观察到意外的结果。当重放开始时,先前发出的结果可能被先前的结果覆盖。因此,一个使用恢复应用程序的输出的应用程序,可能会看到时间上的跳跃,例如,读取比以前更小的计数。此外,在重放过程中,流应用程序的整体结果将处于不一致的状态,因为部分结果将被覆盖,而另一部分则没有。一旦重放完成,应用程序超过了先前失败的点,结果将再次保持一致。

事务性写 

实现端到端一致性的第二种方法是基于事务写,这里的构思是只将这些结果写入外部接收系统。这个方法确保端到端精确一次,因为在出现故障时,应用程序将重置到最后一个检查点,并且在该检查点之后没有向接收系统发送任何结果,只在完成检查点后才开始写数据,事务方法就不会受到幂等写重放不一致的影响,但是,它增加了延迟,因为只有在检查点完成后结果才是可见的。

Flink 提供了两个功能模块来实现事务性的接收端连接器,一个通用的 write-ahead-log (WAL)接收器和一个 two-phase-commit(2PC)接收器。WAL sink 将所有结果记录写入应用程序状态,并在接收到完成检查点的通知后将它们发送到 sink 系统。由于接收器将记录缓存在状态后端存储中,所以 WAL 接收器可以用于任何类型的接收器系统。然而,它并不能提供精确一次保证,相反会增加应用程序的状态大小,所以接收系统必须处理峰值的写入模式。

与 WAL 不同的是,2PC sink 需要一个提供事务支持或公开构建块以模拟事务的接收器系统。对于每个检查点,接收器启动一个事务并将所有接收到的记录附加到事务中,将它们写入接收器系统而不提交它们。当它收到一个检查点完成的通知时,它提交事务并实现结果持久化。该机制依赖于 sink 从在完成检查点之前打开的故障中恢复后提交事务的能力。

2PC 协议依赖 Flink 现有的检查点机制,检查点 barriers 是启动新事务的通知,所有关于 operator 其单个检查点成功的通知是提交投票,而通知检查点成功的 JobManager 消息是提交事务的指令。与 WAL sink 相比,2PC sink 能够基于 sink 系统和 sink 的实现做到精确的一次输出。此外,一个 2PC sink 不断写记录到 sink 系统,不会出现 WAL sink 那种峰值的写入模式。

表 8-1 显示了在最佳情况下可以实现的不同类型的 source 和 sink 连接器的端到端一致性保证;根据 sink 的实现,实际的一致性可能会较差。

 

内置连接器 

Apache Flink 提供连接器,用于与各种存储系统进行数据读写交互。消息队列和事件日志(如 Apache Kafka、Kinesis 或 RabbitMQ)是读取数据流的常见源。在批处理为主的环境中,数据流也常常通过监控文件目录来摄取文件进行处理。

在 sink 端,数据流往往写入到消息队列,用于后续的事件流处理应用,写入文件系统归档或用于离线分析或批处理应用程序,或插入键值存储或关系数据库系统,如 Cassandra,ElasticSearch 或 MySQL,使得数据可搜索和可查询,也可以用于仪表盘数据展示。

不好的一点是,除了用于 RDBMS 的 JDBC 外,大多数存储系统都没有标准接口,相反,每个系统都有自己的带有专用协议的连接器类库。因此,像 Flink 这样的处理系统需要维护几个专用连接器,以便能够从事件中读取事件并将事件写入最常用的消息队列、事件日志、文件系统、键值存储和数据库系统。

Flink 为 Apache Kafka、Kinesis、RabbitMQ、Apache Nifi、各种文件系统、Cassandra、ElasticSearch 和 JDBC 提供连接器。此外,Apache Bahir 项目还为 ActiveMQ、Akka、Flume、Netty 和 Redis 提供了额外的 Flink 连接器。

为了在应用程序中使用提供的连接器,需要将其依赖项添加到项目的构建文件中。我们在“引入外部和 Flink 依赖项”章节中解释了如何添加连接器依赖项。

在下一节中,我们将讨论 Apache Kafka、基于文件的源和 sink 以及 Apache Cassandra 的连接器。这些是使用最广泛的连接器,你可以在 Apache Flink 或 Apache Bahir 的文档中找到关于其他连接器的更多信息。

Apache Kafka 数据源连接器 

Apache Kafka 是一个分布式流平台。它的核心是一个分布式发布-订阅消息系统,广泛用于接收和分发事件流。在深入研究 Flink 的 Kafka 连接器之前,我们简要地解释一下 Kafka 的主要概念。

Kafka 将事件流组织为所谓的主题(Topic)。主题是一个事件日志,它保证事件按写入的顺序读取,为了扩展主题的写和读,可以将主题划分为分布在集群中的分区,顺序保证仅限于分区,kafka 在从不同分区读取时不提供顺序保证。Kafka 分区中的读取位置称为 offset。

Flink 为所有常见的 Kafka 版本提供源连接器。通过 Kafka 0.11,客户端库的 API 得到了改进,并添加了新的特性。例如,Kafka 0.10 增加了对记录时间戳的支持。自从 1.0 发布以来,API 一直保持稳定。Flink 提供了一个通用的 Kafka 连接器,适用于 0.11 以后的所有 Kafka 版本。Flink 还为 Kafka 的 0.8、0.9、0.10 和 0.11 版本提供了特定于版本的连接器。对于本节的其余部分,我们将重点讨论通用连接器,而对于特定于版本的连接器,建议参考 Flink 的官方文档。

将 Flink Kafka 连接器的依赖项添加到 Maven 配置文件中,如下图所示:

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-connector-kafka_2.12</artifactId>

   <version>1.7.1</version>

</dependency>

Flink Kafka 连接器并行地接收事件流,每个并行源任务可以从一个或多个分区读取数据。任务跟踪每个分区的当前读取 offset,并将其包含到检查点数据中。从失败中恢复时,将恢复 offset,并且源实例将继续从检查点 offset 读取数据。Flink Kafka 连接器不依赖 Kafka 自己的 offset 跟踪机制,该机制基于所谓的消费者组。图 8-1 显示了对源实例的分区分配。

创建一个 Kafka 源连接器,如示例 8-1 所示。

val properties = new Properties()

properties.setProperty("bootstrap.servers", "localhost:9092")

properties.setProperty("group.id", "test")

val stream: DataStream[String] = env.addSource(

new FlinkKafkaConsumer[String](

"topic",

new SimpleStringSchema(),

properties)) 

构造函数有三个参数。第一个参数定义要读取的主题。可以是单个主题、主题列表,也可以是匹配所有要读取的主题的正则表达式。当从多个主题读取时,Kafka 连接器将所有主题的所有分区都视为相同的,并将它们的事件多路复用到单个流中。

第二个参数是 DeserializationSchema 或 KeyedDeserializationSchema。Kafka 消息存储为原始字节消息,需要反序列化为 Java 或 Scala 对象。在例 8-1 中使用的 SimpleStringSchema 是一个内置的 DeserializationSchema,它只是将字节数组反序列化为字符串。此外,Flink 还为 Apache Avro 和基于文本的 JSON 编码提供了实现。

DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,因此你可以实现自定义的反序列化逻辑。

第三个参数是一个 Properties 对象,它配置用于连接和读取 Kafka 的 Kafka 客户端。一个最基本的属性配置包含两个属性,"bootstrap.servers" 和"group.id"。有关其他配置属性,请查阅 Kafka 官方文档。为了获取事件时间戳并生成水位线,可以通过调用 FlinkKafkaConsumer.assignTimestampsAndWatermark()方法向 Kafka 消费者提供一个 AssignerWithPeriodicWatermark 或 AssignerWithPunctuatedWatermark。将分配器应用到每个分区,从而利用每个分区的顺序保证,并且源实例根据水位线传播协议合并分区水位线(请查阅“水位线传播和事件时间”)。

请注意,如果一个分区处于不活动状态,则源实例的水位线将不起作用。因此,一个不活动的分区会导致整个应用程序停顿,因为应用程序的水位线不可用。

从 0.10.0 版本开始,Kafka 支持消息时间戳。当从 Kafka 版本 0.10 或更高版本读取消息时,如果应用程序以事件时间模式运行,消费者将自动提取消息时间戳作为事件时间戳。在这种情况下,仍然需要生成水位线,并且应该应用 AssignerWithPeriodicWatermark 或 AssignerWithPunctuatedWatermark 来分发之前分配的 Kafka 时间戳。

还有一些需要注意的配置选项,例如可以配置最初读取 Topic 分区的起始位置。有效的选项是:

 对于一个消费者组而言,kafka 通过 group.id 知道最后一次的消费位置,这也是默认配置:

FlinkKafkaConsumer.setStartFromGroupOffsets()

 从每个分区的最开始位置消费:

FlinkKafkaConsumer.setStartFromEarliest()

 从每个分区的最新位置消费:

FlinkKafkaConsumer.setStartFromLatest()

 消费大于指定时间戳的所有记录(基于 Kafka 0.10 或更高版本):

FlinkKafkaConsumer.setStartFromTimestamp(long)

 使用 map 对象,指定每个分区的消费起始位置:

FlinkKafkaConsumer.setStartFromSpecificOffsets(Map)

 

注意,这种配置只影响第一次读位置。在进行恢复或从保存点开始时,应用程序将从存储在检查点或保存点中的 offset 开始读取。

可以将 Flink Kafka 消费者配置为自动发现与正则表达式匹配的新 Topic 或添加到 Topic 中的新分区。这些特性在默认情况下是禁用的,可以通过向 Properties 对象添加具有非负数的参数 flink.partitiondiscovery.interval-millis 来启用。

 

Apache Kafka 接收端连接器 

Flink 为 0.8 以后的所有 Kafka 版本提供 sink 连接器。从 Kafka 0.11,客户端的 API 得到了改进,并添加了新的特性,比如 Kafka 0.10 支持记录时间戳,Kafka 0.11 支持事务性写。自发布 1.0 以来,API 一直保持稳定。Flink 提供了一个通用的 Kafka 连接器,适用于 0.11 以后的所有 Kafka 版本。Flink 还提供了针对 Kafka 0.8、0.9、0.10 和 0.11 版本的特定于版本的连接器。对于本节的其余部分,我们将重点介绍通用连接器,并向你介绍 Flink 的文档,以获得特定于版本的连接器。Flink 的通用 Kafka 连接器的依赖项被添加到 Maven 项目中,如下图所示:

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-connector-kafka_2.12</artifactId>

   <version>1.7.1</version>

</dependency>

将 Kafka sink 添加到 DataStream 应用程序中,如例 8-2 所示

val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer[String](

               "localhost:9092", // broker list

               "topic", // target topic

               new SimpleStringSchema) // serialization schema

stream.addSink(myProducer)

例 8-2 中使用的构造函数接收三个参数。第一个参数是一个逗号分隔的 Kafka broker 地址字符串。第二个是写入数据的 topic 的名称,最后一个是 SerializationSchema,它将 sink 的输入类型(例 8-2 中的字符串)转换为字节数组。SerializationSchema 是我们在 Kafka 源连接器部分讨论的 DeserializationSchema 的对应版本。

FlinkKafkaProducer 提供了更多具有不同参数组合的构造函数,如下:

 与 Kafka 源连接器类似,可以为 Kafka 客户端传递一个 Properties 对象来提供定制选项。在使用 Properties 时,必须将 brokers 列表作为“bootstrap.servers”属性。查看 Kafka 文档以获得完整的参数列表。

 你可以指定一个 FlinkKafkaPartitioner 来控制记录如何映射到 Kafka 分区。我们将在本节后面更深入地讨论这个特性。

 你还可以指定 KeyedSerializationSchema,而不是使用 SerializationSchema 将记录转换为字节数组,KeyedSerializationSchema 将记录序列化为两个字节数组—一个用于键,另一个用于 Kafka 消息的值。此外,KeyedSerializationSchema 还提供了更多的功能,比如覆盖目标主题以写入多个主题。

kafka sink 至少一次的保证

Flink 的 Kafka sink 提供的一致性保证取决于它的配置。Kafka sink 在以下条件下提供至少一次的保证:

 启用了 Flink 的检查点,应用程序的所有 sources 都可以重新设置。

 如果写操作不成功,sink 连接器将抛出异常,导致应用程序失败并恢复。这是默认的行为。通过将 retries 属性设置为大于 0 的值(默认值),可以将 Kafka 客户端配置为在写操作失败之前进行重试。你还可以通过在接收器对象上调用 setLogFailuresOnly(true)来将接收器配置为只记录写故障。注意,这将使应用程序的输出保证无效。

 sink 连接器等待 Kafka 在完成其检查点之前确认输出记录。这是默认的行为。通过调用 sink 对象上的 setFlushOnCheckpoint(false)禁用这种等待。但是,这也将禁用任何输出保证。

kafka sink 精确一次的保证

Kafka 0.11 引入了对事务性写的支持。由于这个特性,Flink 的 Kafka 接收器也能够提供精确的一次输出保证,只要接收器和 Kafka 配置正确。同样,Flink 应用程序必须启用检查点并从可重置的源消费。此外,FlinkKafkaProducer 提供了一个语义参数的构造函数,该参数负责调控接收器提供的一致性保证,如下:

 Semantic.NONE,它不提供任何保证,记录可能丢失或多次写入。

 Semantic.AT_LEAST_ONCE,它保证没有写操作丢失,但是可能会重复。这是默认设置。

 Semantic.EXACTLY_ONCE,它构建在 Kafka 的事务上,将每个记录精确地写入一次。

当使用 Kafka 接收器以精确一次模式运行 Flink 应用程序时,需要考虑一些事情,这有助于大致了解 Kafka 如何处理事务。简而言之,Kafka 的事务将所有消息添加到分区日志中,并将打开的事务标记为未提交。一旦事务被提交,标记就被更改为已提交。从 topic 读取数据的消费者可以配置一个隔离级别(通过 isolation.level 属性)。声明它是否可以读取未提交的消息(默认 read_uncommitted)。如果消费者被配置为 read_committed,那么一旦它遇到未提交的消息,它就停止从一个分区消费,并在提交消息时继续使用。因此,打开的事务可能会阻止消费者读取分区消息并带来显著的延迟。Kafka 通过在超时后拒绝和关闭事务来防止这种情况,使用 transaction.timeout.ms 的属性进行配置超时时间。

在 Flink 的 Kafka sink 上下文中,由于恢复周期过长而超时的事务会导致数据丢失,所以正确配置事务超时属性是非常重要的。默认情况下,Flink Kafka sink 设置 transaction.timeout.ms 为一小时。这意味着你可能需要调整 kafka 本身的 transaction.max.timeout.ms 属性,默认设置为 15 分钟。此外,提交消息的可见性取决于 Flink 应用程序的检查点间隔。请查阅 Flink 文档,了解在启用精确一次一致性时的其他相关案例。

                   检查 kafka 集群的配置

Kafka 集群的默认配置仍然会导致数据丢失,即使在确认写操作之后也是如此。你应该仔细修改 Kafka 设置的配置,特别注意以下参数:

ack

log.flush.interval.messages

log.flush.interval.ms

log.flush。*

我们建议你参考 Kafka 文档,以获得关于它的配置参数的详细信息和适用配置的指导原则。

自定义分区和写入消息时间戳

当将消息写入 Kafka topic 时,Flink Kafka sink 任务可以选择写入 topic 的哪个分区。FlinkKafkaPartitioner 可以在 Flink Kafka sink 的一些构造函数中定义。如果没有指定,默认的分区器将每个 sink 任务映射到一个 Kafka 分区,由同一个 sink 任务发出的所有记录都被写到同一个分区,如果任务多于分区,单个分区可能包含多个 sink 任务的记录。如果分区的数量大于子任务的数量,则默认配置将导致空分区,事件时间模式下的 Flink 应用程序,消费此时的 topic 是,可能会出现问题。

通过提供一个自定义 FlinkKafkaPartitioner,你可以控制如何将记录路由到 topic 分区。例如,可以根据记录的 key 属性创建分区器,或者创建循环分区器以实现均匀分布。还可以根据消息 key 将分区委托给 Kafka。这需要一个 KeyedSerializationSchema 来提取消息 key,并使用 null 配置 FlinkKafkaPartitioner 参数来禁用默认分区器。

最后,可以将 Flink 的 Kafka sink 配置为写入消息时间戳,这是 Kafka 0.10 所支持的。通过在 sink 对象上调用 setWriteTimestampToKafka(true),可以将记录的事件时间戳写入 Kafka。

文件系统数据源连接器 

Filesystems 通常用于以成本低效益高的方式存储海量数据。在大数据体系架构中,它们通常充当批处理应用程序的数据源和数据接收器。结合 Apache Parquet 或 Apache ORC 这些高级文件格式,文件系统可以有效地为分析查询引擎,如 Apache Hive、Apache Impala 或 Presto 等提供服务。因此,文件系统通常用于“连接”流和批处理应用程序。

Apache Flink 提供了一个可重置的源连接器,以摄入文件中的数据作为流。文件系统源是 flinkstreaming- java 模块的一部分,因此,你不需要添加任何其他依赖项来使用此功能。Flink 支持不同类型的文件系统,比如本地文件系统(包括 NFS 或 SAN 共享、Hadoop HDFS、Amazon S3 和 OpenStack Swift FS)。请查阅“文件系统配置”以了解如何在 Flink 中配置文件系统。示例 8-3 显示了如何通过按行读取文本文件来读取流。

val lineReader = new TextInputFormat(null)

val lineStream: DataStream[String] = env.readFile[String](

lineReader, // The FileInputFormat

"hdfs:///path/to/my/data", // The path to read

FileProcessingMode.PROCESS_CONTINUOUSLY, // The processing mode

30000L) // The monitoring interval in ms

StreamExecutionEnvironment.readFile()方法的参数为:

 FileInputFormat 负责读取文件内容。我们将在本节的后面讨论这个接口的细节。例 8-3 中的 TextInputFormat 的 null 参数定义了单独设置的路径。

 应该读取的路径。如果路径引用一个文件,则读取单个文件,如果它引用一个目录,FileInputFormat 将扫描该目录以读取文件。

 读取路径的模式。主要有两种,分别是 PROCESS_ONCE 或 PROCESS_CONTINUOUSLY。在 PROCESS_ONCE 模式中,当作业启动并读取所有匹配的文件时,读取路径将被扫描一次。在 PROCESS_CONTINUOUSLY 中,将定期扫描路径(初始扫描后),连续读取新文件和修改过的文件。

 周期性扫描路径的时间间隔,单位为毫秒。在 PROCESS_ONCE 模式中忽略该参数。

FileInputFormat 是一种专门用于从文件系统中读取文件的 InputFormat。FileInputFormat 分两个步骤读取文件。第一步,它会扫描文件系统路径,并为所有匹配的文件创建所谓的文件切割分片。文件切割分片定义文件上的范围,通常用起始 offset 和长度定义。在将一个大文件分成多个分段之后,可以将这些分段分配给多个 reader 任务来并行地读取文件。根据文件的编码,可能只需要生成一个分割来读取整个文件。FileInputFormat 的第二步是接收输入分割,读取分割定义的文件范围,并返回所有相应的记录。

DataStream 应用程序中使用的 FileInputFormat 还应该实现 CheckpointableInputFormat 接口,该接口定义了检查点的方法,并在文件分割中重置 InputFormat 的当前读取位置。如果 FileInputFormat 没有实现 CheckpointableInputFormat 接口,则文件系统源连接器仅在启用检查点时至少提供一次保证,因为输入格式将从上次执行完整检查点时处理的分割开始读取。

在 1.7 版本中,Flink 提供了一些扩展 FileInputFormat 和实现 CheckpointableInputFormat 的类。TextInputFormat 按行读取文本文件(按换行字符分隔),CsvInputFormat 的子类按逗号分隔值读取文件,AvroInputFormat 按 avro 编码记录读取文件。

在 PROCESS_CONTINUOUSLY 模式下,文件系统源连接器根据修改时间戳识别新文件。这意味着如果一个文件被修改,它将被完全重新处理,因为修改时间戳发生了变化,包括由于追加内容而引起的修改,因此持续读取文件的一种常见技术是将它们写入临时目录,并在完成后自动将它们移到被监控的目录中。当一个文件被完全读取并且完成了一个检查点时,就可以从目录中删除它。如果读取最终一致的列表操作,如 S3 的文件存储,也有影响。由于文件可能不会按照其修改时间戳的顺序出现,因此文件系统源连接器可能会忽略它们。

值得注意的是,在 PROCESS_ONCE 模式中,在扫描文件系统路径并创建所有的分片之后,不会采取任何检查点。

如果你想在基于事件时间应用程序中使用文件系统源连接器,生成水位线具体一定的挑战性,因为输入拆分是在一个进程中生成的,并且按照文件的修改时间戳分发给所有并行 reader 任务。 为了生成合适的水印,你需要对包含在任务稍后处理的拆分中的记录的最小时间戳进行推理。

 

文件系统接收端连接器 

将流写入文件是一个常见的需求,比如说,为离线分析处理准备低延迟的数据。因为大多数应用程序只有在文件完成写入,以及流应用程序长时间运行后才能读取文件,所以 streaming sink 连接器通常会将其输出分块到多个文件中。此外,通常会将记录组织到所谓的 bucket 中,这样消费应用程序可以更好地控制读取哪些数据。

与文件系统源连接器一样,Flink 的 StreamingFileSink 连接器也包含在 flink-streaming-java 模块中。因此,不需要向构建文件添加依赖项。

StreamingFileSink 为应用程序提供端到端的精确一次保证,因为应用程序配置了精确一次检查点,并且在出现故障时重置所有源。我们将在本节后面更详细地讨论恢复机制。示例 8-4 展示了如何使用最基本的配置创建 StreamingFileSink 并将其附加到流中。

val input: DataStream[String] = …val sink: StreamingFileSink[String] = StreamingFileSink.forRowFormat(   new Path("/base/path"),   new SimpleStringEncoder[String]).build()


input.addSink(sink)

当 StreamingFileSink 接收到记录时,将记录分配给一个 bucket。bucket 是基本路径的子目录,在示例 8-4 中使用 StreamingFileSink 构建器配置了“/base/path”。

bucket 是由 BucketAssigner 选择的,这是一个公共接口,并为每个记录返回一个 BucketId,该 BucketId 决定了记录将被写入的目录。可以使用 withBucketAssigner()方法在构建器上配置 BucketAssigner()。如果没有显式指定 BucketAssigner,则使用 DateTimeBucketAssigner,根据记录写入时的处理时间将记录分配到每小时的 bucket。

每个 bucket 目录包含多个分片文件,这些文件由 StreamingFileSink 的多个并行实例并发生成。此外,每个并行实例将其输出分割成多个分片文件。分片文件的路径格式如下:

[base-path]/[bucket-path]/part-[task-idx]-[id]

例如,给定一个“/johndoe/demo”的基本路径和一个 part 前缀“part”,这个路径“/johndoe/demo/2018-07-22-17/part-4-8”指向由第五个(下标从 0 开始)sink 任务写入 bucket“2018-07-22-17”的 8 个文件,即:2018 年 7 月 22 日下午 5 点。

            提交文件的 id 可能不是连续的

非连续文件 id(提交文件名称中的最后一个数字)不表示数据丢失。StreamingFileSink 只是增加文件 id。当丢弃挂起的文件时,它不会重用它们的 id。

RollingPolicy 确定任务何时创建新分片文件。可以使用构建器上的 withRollingPolicy()方法来配置 RollingPolicy。默认情况下,StreamingFileSink 使用 DefaultRollingPolicy,该策略配置为当分片文件超过 128 MB 或超过 60 秒时滚动生成新分片文件。还可以配置非活动时间间隔,在此之后将滚动生成分片文件。

StreamingFileSink 支持两种将记录写入分片文件的模式:row 编码和 buik 编码。在 row 编码模式中,每个记录都单独编码并附加到一个分片文件中,在 bulk 编码中,记录被分批收集和写入的。Apache Parquet 以列格式组织和压缩记录,是一种需要 bulk 编码的文件格式。

例 8-4 通过提供一个将单个记录写入分片文件的 Encoder,使用 row 编码创建 StreamingFileSink。在例 8-4 中,我们使用了 SimpleStringEncoder,它调用了记录的 toString()方法,并将记录的字符串写入文件。Encoder 是一个简单的接口和单一方法,非常容易实现。

例 8-5 所示,创建了一个 bulk 编码的 StreamingFileSink。

val input: DataStream[String] = …

val sink: StreamingFileSink[String] = StreamingFileSink.forBulkFormat(

   new Path("/base/path"),

   ParquetAvroWriters.forSpecificRecord(classOf[AvroPojo])

).build()

 

input.addSink(sink)

bulk 编码模式下的 StreamingFileSink 需要 BulkWriter.Factory。在例 8-5 中,我们对 Avro 文件使用了 Parquet writer。请注意,Parquet writer 包含在 flink-parquet 模块中,该模块需要作为依赖项添加。像往常一样,BulkWriter.Factory 是一个可以实现自定义文件格式的接口,如 Apache Orc。

bulk 编码模式下的 StreamingFileSink 不能选择 RollingPolicy。bulk 编码格式只能与 OnCheckpointRollingPolicy 相结合,OnCheckpointRollingPolicy 在每个检查点上滚动生成分片文件。

StreamingFileSink 提供了精确的一次输出保证。StreamingFileSink 通过一个提交协议来实现这一点,该协议将文件移动到不同的 stages,包括处理中、挂起状态和完成状态,基于 Flink 的检查点机制。当 sink 写入文件时,文件处于处理中状态。当 RollingPolicy 决定滚动文件时,将关闭该文件并通过重命名将其移动到挂起状态。当下一个检查点完成时,挂起的文件将移动到完成状态(再次通过重命名)。

             挂起的文件可能永远不会被提交

在某些情况下,永远不会提交挂起文件。StreamingFileSink 确保这不会导致数据丢失。但是,这些文件不会自动清除。

在手动删除一个挂起文件之前,你需要检查它是在延迟还是即将提交。找到具有相同任务索引和更大 ID 的提交文件后,可以安全地删除挂起文件。

在失败的情况下,sink 任务需要将当前正在处理的文件重置为最近一次成功检查点处的 offset。这是通过关闭当前正在处理的文件并丢弃文件末尾的无效部分来实现的,例如,通过使用文件系统的 truncate 操作。

            STREAMINGFILESINK 需要启用检查点

如果应用程序没有启用检查点,那么 StreamingFileSink 将永远不会将文件从挂起状态移动到完成状态。

 

Apache Cassandra 接收端连接器 

Apache Cassandra 是一个流行的、可伸缩的和高可用的列存储数据库系统。Cassandra 将数据集建模为由多个类型的列组成的行的表,一个或多个列必须定义为(复合)主键。每行都有的主键作为唯一标识。在其他 API 中,Cassandra 查询语言(CQL)是 Cassandra 的特性,是一种类似于 SQL 的语言,用于读写数据、创建、修改和删除数据库对象,如键空间和表。

Flink 提供了一个接收端连接器来将数据流写入 Cassandra。 Cassandra 的数据模型是基于主键的,所有对 Cassandra 的写入都是基于 upsert 语义。 结合 checkpointing、resetable source 和确定性应用程序逻辑,可以保证最终输出的一致性,因为结果在恢复过程中被重置为以前的版本,这意味着消费者可能会读取比以前更旧的结果。 此外,多个键的值可能不同步。

为了防止恢复过程中的时间不一致,并为具有不确定性应用程序逻辑的应用程序提供精确一次的输出保证,Flink 的 Cassandra 连接器可以配置为 WAL。我们将在本节后面更详细地讨论 WAL 模式。以下代码显示了使用 Cassandra 接收器连接器而需要添加到应用程序的构建文件中的依赖关系:

<dependency>

  <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-cassandra_2.12</artifactId>

  <version>1.7.1</version>

</dependency> 

为了说明 Cassandra 接收端连接器的使用,我们使用了 Cassandra 表的简单示例,它包含关于传感器读数的数据,由传感器 Id 和温度两列组成。

CREATE KEYSPACE IF NOT EXISTS example

WITH replication = {'class': 'SimpleStrategy', 'replication_factor':

'1'};

CREATE TABLE IF NOT EXISTS example.sensors (

sensorId VARCHAR,

temperature FLOAT,

PRIMARY KEY(sensorId)

); 

Flink 提供了不同的接收器实现将不同数据类型的数据流写入 Cassandra 。Flink 的 Java Tuple 和 Row 类型以及 scala 的内置 Tuple 和 case 类的处理方式与用户定义的 POJO 类型不同。我们分别讨论这两种情况。示例 8-7 展示了如何创建一个接收器,它将 Tuple、case 类或 Row 的数据流写入 Cassandra 表中。在这个例子中,将 DataStream[(String,Float))写入“传感器”表。

val readings: DataStream[(String, Float)] = ???

val sinkBuilder: CassandraSinkBuilder[(String, Float)] =

CassandraSink.addSink(readings)

sinkBuilder

.setHost("localhost")

.setQuery(

"INSERT INTO example.sensors(sensorId, temperature) VALUES (?, ?);")

.build() 

Cassandra sink 是调用 Cassandra Sink.add Sink()方法获得的构建器创建和配置的,添加应该发出的数据流对象, 该方法为数据流的数据类型返回正确的构造器。 在示例 8-7 中,它返回用于处理 Scala Tuple 的 Cassandra 接收器的构造器。

用于 Tuple、case 类和 Row 的 Cassandra 接收器构建器需要遵循 CQL  INSERT 查询的规范。使用 CassandraSinkBuilder.setQuery()方法配置查询。 在执行期间,sink 将准备好的语句进行注册,并将 Tuple、case 类或 Row 的字段转换为已准备好的语句的参数。 字段根据其位置映射到对应的参数中。

由于 POJO 字段没有顺序,因此需要对它们进行不同的处理。 示例 8-8 演示了如何为类型传感器读取的 POJO 配置 Cassandra 接收器。

val readings: DataStream[SensorReading] = ??? CassandraSink.addSink(readings)

.setHost("localhost")

.build()

如示例 8-8 所示,我们不指定 INSERT 查询。 相反,POJO 交给 Cassandra 的对象映射器,它自动将 POJO 字段映射到 Cassandra 表的字段。 为了使其生效,POJO 类及其字段需要进行注释,并为所有字段提供 setter 和 getter 方法,如示例 8-9 所示。 默认构造函数是 Flink 在讨论支持的数据类型时在“支持的数据类型”中提到的。

@Table(keyspace = "example", name = "sensors")

 class SensorReadings(

 @Column(name = "sensorId") var id: String,

 @Column(name = "temperature") var temp: Float) {

 def this() = { this("", 0.0) }

 def setId(id: String): Unit = this.id = id

 def getId: String = id

 def setTemp(temp: Float): Unit = this.temp = temp

 def getTemp: Float = temp

 }

除了图 8-7 和 8-8 中的配置选项外,Cassandra 接收器器还提供了一些配置接收器连接器的方法:

 setClusterBuilder(ClusterBuilder):集群构造器构建一个 Cassandra 集群,用于管理与 Cassandra 的连接。 它可以配置一个或多个主机名和端口;定义负载均衡、重试和重新连接策略;并提供访问凭据。

 setHost(String,[Int]):此方法是配置单个地址的主机名和端口的简单集群构造器的快捷方式。 如果没有配置端口,则使用 Cassandra 的默认端口 9042。

 setQuery(String):这指定 CQL INSERT 查询将 Tuple、case 类或 Row 写入 Cassandra。

 setMapperOptions(MapperOptions):这为 Cassandra 的对象映射器提供了选项,例如一致性、生命周期和空字段处理的配置。 如果接收器发出 Tuple、case 类或 Row,则忽略选项。

 enableWriteAheadLog([CheckpointCommitter]):这使得 WAL 能够在非确定性应用程序逻辑的情况下提供精确一次的输出保证。检查点比较器用于在外部数据存储中存储有关已完成的检查点的信息。如果未配置 CheckpointCommitter,则信息将写入特定的 Cassandra 表。

带有 WAL 的 Cassandra 接收器连接器是基于 Flink 的 GenericWriteAheadSink operator 实现的。 这个 operator 是如何工作的,包括 CheckpointCommitter 的作用,以及它提供了哪些一致性保证,在“事务 Sink 连接器”中有更详细的描述”

 

实现自定义数据源函数 

DataStream API 提供了两个接口来实现源连接器和相应的 RichFunction 抽象类:

 SourceFunction 和 RichSourceFunction 可用于定义非并行的源连接器,使用单个任务运行的源。

 ParallelSourceFunction 和 RichParallelSourceFunction 可用于定义与多个并行任务实例一起运行的源连接器。

除了非并行和并行的区别之外,这两个接口是相同的。RichSourceFunction 和 RichParallelSourceFunction 的子类可以覆盖 open()和 close()方法,并访问 Runtime Context,其中提供并行任务实例的数量和当前实例的索引等。

SourceFunction 和 ParallelSourceFunction 定义了两个方法:

 void run(SourceContext<T> ctx)

 void cancel()

run()方法执行读取或接收记录并将其摄入到 Flink 应用程序中,是实际的工作方法,根据接收数据的系统,数据可以推送或拉取。run()方法由 Flink 调用一次,并在指定的源线程中运行,通常在一个无限循环(无界流)中读取或接收数据并发出记录。可以在某个时间点显式地取消任务,或者在有界流的情况下,当输入被完全消费完也会终止任务。

当 Flink 调用 cancel()方法时,应用程序会被取消和关闭。为了优雅的关闭,在单独的线程中运行的 run()方法应该在调用 cancel()方法后立即终止。示例 8-10 显示了一个简单的源函数,其计数范围从 0 到 Long.MaxValue。

class CountSource extends SourceFunction[Long] {

   var isRunning: Boolean = true

   override def run(ctx: SourceFunction.SourceContext[Long]) = {

       var cnt: Long = -1

       while (isRunning && cnt < Long.MaxValue) {

           cnt += 1

           ctx.collect(cnt)

      }

  }

   override def cancel() = isRunning = false

}

可重置的数据源函数 

在本章的前面,我们解释了 Flink 只能为使用源连接器的应用程序提供精确一致性保证,这些应用程序可以重放输出数据。如果提供数据的外部系统暴露 API 来检索和重置读取 offset,则源函数可以重放其输出数据。这类系统的示例提供文件流 offset 的文件系统将文件流移动到特定位置,或者使用 Apache Kafka 的 seek 方法,后者为主题的每个分区提供 offset,可以设置分区的读取位置。相反的是从 socket 读取数据的源连接器,它会立即丢弃已传输的数据。

支持输出回放的源函数需要与 Flink 的检查点机制集成,并且必须在生成检查点时必须持久化当前的读取位置。当应用程序从保存点启动或从故障中恢复时,读取 offset 将从最新的检查点或保存点检索。如果应用程序在没有现有状态的情况下启动,则读取 offset 必须设置为默认值。复位源函数需要实现 CheckpointedFunction 接口,存储读取 offset 和所有相关的元数据信息,如文件路径或分区 ID,在 operator list state 或 operator union list state 中,取决于在重新扩展应用的程序中 offsets 应该如何分配到并行的 task 实例上。有关 operator list state 和 union list state 的分配的详细信息,请查阅“缩放有状态算子(Scaling Stateful Operators)”章节。

此外,还有非常重要的一点是确保在单独的线程中运行的 SourceFunction.run()方法不会提前读取 offset 并在采取检查点时发出数据;换句话说,当调用 CheckpointedFunction.snapshotState()方法时,这是通过控制 run()中的代码来实现的,run()将读取位置往前移,并在一个块中发出记录,该块同步于锁对象上,该对象是从 SourceContext.getCheckpointLock()方法获得的。例 8-11 使例 8-10 的 CountSource 重置。

class ResettableCountSource extends SourceFunction[Long] withCheckpointedFunction {

   var isRunning: Boolean = true

   var cnt: Long = _

   var offsetState: ListState[Long] = _

   override def run(ctx: SourceFunction.SourceContext[Long]) = {

       while (isRunning && cnt < Long.MaxValue) {

 // synchronize data emission and checkpoints

           ctx.getCheckpointLock.synchronized {

               cnt += 1

               ctx.collect(cnt)

          }

      }

  }

   override def cancel() = isRunning = false

   override def snapshotState(snapshotCtx:FunctionSnapshotContext): Unit = {

  // remove previous cnt

       offsetState.clear()

// add current cnt     

 offsetState.add(cnt) 

}

   override def initializeState(initCtx: FunctionInitializationContext): Unit = {       val desc = new ListStateDescriptor[Long]      offsetState =initCtx.getOperatorStateStore.getListState(desc)

 // initialize cnt variable

val it = offsetState.get()

       cnt = if (null == it || !it.iterator().hasNext) {

           -1L

      } else {

           it.iterator().next()

      }

  }

}

数据源函数、时间戳及水位线 

源函数的另一个重要方面是时间戳和水位线。正如在“事件时间处理”和“分配时间戳和生成水位线”中指出的那样,DataStream API 提供了两个选项来分配时间戳和生成水位线。时间戳和水位线可以由指定的 TimestampAssigner 分配和生成(详细信息请查阅“分配时间戳和生成水位线”),或者由源函数分配和生成。

源函数分配时间戳并通过其 SourceContext 对象发出水位线。SourceContext 提供了以下方法:

 def collectWithTimestamp(T record, longtimestamp): Unit

 def emitWatermark(Watermark watermark):Unit

collectWithTimestamp()输出具有相关时间戳的记录,emitWatermark()输出提供的水位线。

如果源函数的一个并行实例消费了多个流分区的记录,如 Kafka 主题的分区,那么除了不需要另外的算子外,在源函数中分配时间戳和生成水位线也是非常有用的。通常,外部系统(如 Kafka)只保证流分区中的消息顺序。给定一个并行度为 2 的源函数算子,从 Kafka 主题的 6 个分区读取数据,源函数的每个并行实例将从 3 个 Kafka 主题分区读取记录。结果是源函数的每个实例多路复用三个流分区的记录来输出它们。多路复用分区数据很可能会在事件时间戳方面增加更多的无序性,这样一来下游时间戳分配器可能会产生比预期更多的延迟记录。

为了避免这种现象,源函数可以独立地为每个流分区生成水位线,并且始终将其分区的最低水位线作为水位线,这样一来就不会输出不必要的延迟记录。

源函数必须处理的另一个问题是实例变得空闲并且不再输出数据的实例,通常有出现问题,因为它可能使得程序的水位线不再向前推移,从而导致应用程序陷入停滞状态。由于水位线应该是数据驱动的,所以如果水位线生成器(集成在源函数或时间戳分配程序中)没有接收到输入记录,将不会发出新的水位线。如果你查看一下 Flink 是如何传播和更新水位线的(请查阅“水位线传播和事件时间”),你就会发现,一旦应用程序涉及到一个 shuffle 操作(keyBy()、rebalance()等),那么一个不提前使用水位线的算子就可以使应用程序所有的水位线停滞不前。

Flink 提供了一种机制,通过将源函数标记为临时空闲状态来避免这种情况。当处于空闲状态时,Flink 的水位线传播机制将忽略空闲流分区。一旦数据源再次开始发出记录,它就会被自动设置为活跃状态。源函数可以通过调用 SourceContext.markAsTemporarilyIdle()方法来决定何时将自己标记为空闲状态。

实现自定义接收端函数 

在 Flink 的 DataStream API 中,任何算子或函数都可以将数据发送到外部系统或其他应用程序,数据流最终不必流到接收器算子中。例如,你可以实现一个 FlatMapFunction,通过 HTTP POST 调用而不是通过其收集器来输出每个传入的记录。尽管如此,DataStream API 提供了一个专用的 SinkFunction 接口和一个相应的 RichSinkFunction 抽象类。SinkFunction 接口提供了一个单一的方法:

void invoke(IN value, Context ctx)

SinkFunction 的上下文对象提供了对当前处理时间、当前水位线以及记录的时间戳的访问。

例 8-12 显示了一个简单的 SinkFunction 将传感器读数写入 socket。注意,在启动程序之前,你需要启动一个监听 socket 的进程。否则,由于会出现 socket 连接异常,程序会因 ConnectException 异常而失败。在 Linux 上运行命令 nc -l localhost 9191 来监听 localhost:9191。

val readings: DataStream[SensorReading] = ???

// write the sensor readings to a socket

readings.addSink(new SimpleSocketSink("localhost", 9191))

// set parallelism to 1 because only one thread can write to a socket

.setParallelism(1)

// -----

class SimpleSocketSink(val host: String, val port: Int)

extends RichSinkFunction[SensorReading] {

   var socket: Socket = _

   var writer: PrintStream = _

   override def open(config: Configuration): Unit = {

   // open socket and writer

       socket = new Socket(InetAddress.getByName(host), port)

       writer = new PrintStream(socket.getOutputStream)

  }

   override def invoke(

       value: SensorReading,

       ctx: SinkFunction.Context[_]): Unit = {

       // write sensor reading to socket

writer.println(value.toString)

       writer.flush()

  }

   override def close(): Unit = {

       // close writer and socket

  writer.close()

       socket.close()

  }

}

如上所述,应用程序的端到端精确一致性保证取决于其 sink 连接器的属性。为了实现端到端的精确一次语义,应用程序需要幂等性或事务性接收端连接器。例 8-12 中的 SinkFunction 既不执行幂等写功能,也不提供事务性写支持。由于 socket 仅提供追加的特性,因此无法执行幂等写操作。此外套接字没有内置的事务支持,所以只能使用 Flink 的通用 WAL sink 完成事务写。在接下来的部分中,你将了解如何实现幂等性或事务性接收端连接器。

幂等性接收端连接器 

对于许多应用程序,SinkFunction 接口足以实现幂等接收器连接器。需要满足以下两点:

1. 结果数据具有一个确定(复合)key,可以对该 key 执行幂等更新。对于计算每个传感器和每分钟的平均温度的应用程序,确定 key 可以是传感器的 ID 和每分钟的时间戳。确定 key 对于确保在恢复的情况下正确地覆盖所有写入是很重要的。

2. 外部系统支持每个 key 的更新,比如关系数据库系统或键值存储。

 

示例 8-13 说明了如何实现和使用向 JDBC 数据库写入的幂等 SinkFunction,在本例中是嵌入式 Apache Derby 数据库。

val readings: DataStream[SensorReading] = ???

// write the sensor readings to a Derby table

readings.addSink(new DerbyUpsertSink)

// -----

class DerbyUpsertSink extends RichSinkFunction[SensorReading] {

var conn: Connection = _

var insertStmt: PreparedStatement = _

var updateStmt: PreparedStatement = _

override def open(parameters: Configuration): Unit = {

   // connect to embedded in-memory Derby

   conn = DriverManager.getConnection("jdbc:derby:memory:flinkExample",newProperties())

   // prepare insert and update statements

   insertStmt = conn.prepareStatement("INSERT INTO Temperatures (sensor, temp) VALUES   (?, ?)")

   updateStmt = conn.prepareStatement("UPDATE Temperatures SET temp = ? WHERE sensor = ?")

}

override def invoke(r: SensorReading, context: Context[_]):Unit = {

   // set parameters for update statement and execute it

   updateStmt.setDouble(1, r.temperature)

   updateStmt.setString(2, r.id)

   updateStmt.execute()

   // execute insert statement if update statement did not update any row

   if (updateStmt.getUpdateCount == 0) {

       // set parameters for insert statement

       insertStmt.setString(1, r.id)

       insertStmt.setDouble(2, r.temperature)

       // execute insert statement

       insertStmt.execute()

  }

}

override def close(): Unit = {

   insertStmt.close()

   updateStmt.close()

   conn.close()

}

}

由于 Apache Derby 不提供内置的 UPSERT 语句,因此示例接收器首先尝试更新行,如果没有具有给定键的行,则插入新行。当未启用 WAL 时,Cassandra sink 连接器遵循相同的方法。

事务性接收端连接器 

当幂等接收器连接器不适合时,无论是应用程序输出的特性,或者所需接收器系统的属性,还是更严格的一致性要求,事务性接收器连接器都可以作为另一种选择方案。如前所述,事务性接收端连接器需要与 Flink 的检查点机制集成,因为它们只能在检查点成功完成时将数据提交到外部系统。

为了简化事务性接收器的实现,Flink 的 DataStream API 提供了两个模板,可以扩展它们来实现自定义接收算子。两个模板都实现了 CheckpointListener 接口来接收来自 JobManager 关于完成的检查点的通知(有关接口的详细信息,请查阅“接收关于完成的检查点的通知”):

l GenericWriteAheadSink 模板收集每个检查点的所有输出记录,并将它们存储在 sink 任务的算子状态。在失败的情况下,状态被检查并恢复。当任务收到检查点完成通知时,它将完成的检查点的记录写入外部系统。带有 WAL- enabled 的 Cassandra sink 连接器实现了这个接口。

l TwoPhaseCommitSinkFunction 模板利用了外部接收器系统的事务特性,对于每个检查点,它启动一个新事务,并在当前事务的上下文中将所有记录写入接收器。接收器在接收到相应检查点的完成通知时提交事务。

在下面,我们描述了接口及其一致性保证。

GENERICWRITEAHEADSINK

GenericWriteAheadSink 通过改进一致性属性简化了 sink 算子的实现。该算子与 Flink 的检查点机制集成,目的是将每条记录精确一次地写入外部系统。但是,你应该知道存在这样的失败场景,即预写日志接收器输出的记录不止一次。因此,一个 GenericWriteAheadSink 无法提供精确一次保证,只能提供至少一次的保证。我们将在本节后面更详细地讨论这些场景。

GenericWriteAheadSink 的工作方式是将所有接收到的记录附加到由检查点分割的预写日志中。每次 sink 算子接收到一个检查点 barrier 时,它就会启动一个新分片,并将所有后续记录附加到新分片中。存储 WAL 并作为算子状态进行检查。由于日志将被恢复,因此在失败的情况下不会丢失任何记录。

当 GenericWriteAheadSink 接收到关于完成的检查点的通知时,它会输出存储与已完成检查点的 segment 中的所有记录。根据 sink 算子的具体实现,可以将记录写入任何类型的存储或消息系统,当所有记录都成功输出后,必须在内部提交相应的检查点。

检查点分两个步骤提交。首先,接收器持续存储已提交的检查点信息,然后从 WAL 中删除记录。将提交信息存储在 Flink 的应用程序状态中,因为它不是持久性的,并且在出现故障时将被重置。相反,GenericWriteAheadSink 依赖于一个名为 CheckpointCommitter 的可插入组件来存储和查找关于外部持久存储中提交的检查点的信息。例如,Cassandra sink 连接器默认使用一个向 Cassandra 写入的 CheckpointCommitter。

由于 GenericWriteAheadSink 的内置逻辑,实现一个利用 WAL 的 sink 并不困难。扩展 GenericWriteAheadSink 的算子需要提供三个构造函数参数:

一个 CheckpointCommitter,见前面章节介绍。

 一个 TypeSerializer 用于序列化输入记录。

 传递给 CheckpointCommitter 的作业 ID,以标识跨应用程序重新启动的提交信息

此外,write-ahead 运算符需要实现一个单一的方法:

boolean sendValues(Iterable<IN> values, long chkpntId, long timestamp)

GenericWriteAheadSink 调用 sendValues()方法将完成的检查点的记录写入外部存储系统。该方法接收一个 Iterable(包括检查点的所有记录)、一个检查点的 ID 和一个生成检查点的时间戳。如果所有写操作都成功,则该方法必须返回 true;如果写操作失败,则返回 false。

示例 8-14 展示了一个写到标准输出的 WriteAhead sink 实现。它使用 FileCheckpointCommitter,我们在这里不讨论它。你可以在包含该书示例的代码仓库中查找它的实现。

注意

GenericWriteAheadSink 不实现 SinkFunction 接口。因此,不能使用 DataStream.addSink()添加扩展 GenericWriteAheadSink 的 sink,而是使用 DataStream.transform()方法附加它。

val readings: DataStream[SensorReading] = ???

// write the sensor readings to the standard out via a writeahead log

readings.transform("WriteAheadSink", new SocketWriteAheadSink)

// -----

class StdOutWriteAheadSink extends GenericWriteAheadSink[SensorReading](

// CheckpointCommitter that commits checkpoints to the local filesystem

new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")),

// Serializer for records

createTypeInformation[SensorReading].createSerializer(new ExecutionConfig),

// Random JobID used by the CheckpointCommitter

UUID.randomUUID.toString) {

override def sendValues(

   readings: Iterable[SensorReading],

   checkpointId: Long,

   timestamp: Long): Boolean = {

   for (r <- readings.asScala) {

  // write record to standard out

  println(r)

  }

  true

  }

}

示例代码库包含一个应用程序,该应用程序在发生故障时定期进行故障恢复,以演示 StdOutWriteAheadSink 和一个常规的 DataStream.print() sink 的行为。

如前所述,GenericWriteAheadSink 不能提供精确一次保证。有两种故障情况会导致记录被输出不止一次:

 当任务运行 sendValues()方法时,程序失败。如果外部 sink 系统不能原子地写入多个记录,要么全部写入,要么全部失败,而部分记录可能被写入。由于检查点尚未提交,所以在恢复期间 sink 将再次写入所有记录。

 所有记录都正确写入,sendValues()方法返回 true;但是,在调用 CheckpointCommitter 或 CheckpointCommitter 未能提交检查点之前,程序会失败。在恢复期间,所有尚未提交的检查点记录将被重新写入。

TWOPHASECOMMITSINKFUNCTION

Flink 提供了 TwoPhaseCommitSinkFunction 接口,以简化 sink 函数的实现,这些 sink 函数提供端到端的精确一次保证。但是,2PC sink 函数是否提供这种保证取决于实现细节。我们从一个问题开始讨论这个接口:“2PC 协议是不是代价太大了?”

通常,2PC 是确保分布式系统一致性的方法,代价相对来说比较大。但是,在 Flink 上下文中,协议对于每个检查点只运行一次。此外,TwoPhaseCommitSinkFunction 协议利用了 Flink 的常规检查点机制,因此增加的开销很小。TwoPhaseCommitSinkFunction 的工作原理与 WAL sink 非常相似,但它不会收集 Flink 应用状态下的记录;相反,它将它们以开放事务的形式写入外部接收器系统。

TwoPhaseCommitSinkFunction 实现以下协议。在 sink 任务发出第一个记录之前,它在外部 sink 系统上启动一个事务。所有随后收到的记录都是在事务的上下文中写入的。当 JobManager 启动一个检查点并在应用程序的源中注入 barriers 时,2PC 协议的投票阶段就开始了。当算子接收到 barrier 时,它会保持其状态,并在完成之后向 JobManager 发送确认消息。当 sink 任务接收到 barrier 时,它将持久化该状态,准备提交当前事务,并在 JobManager 上确认检查点。JobManager 的确认消息类似于 2PC 协议的提交投票。sink 任务还不能提交事务,因为不能保证作业的所有任务都将完成其检查点。sink 任务还为在下一个检查点 barrier 之前到达的所有记录启动一个新事务。

当 JobManager 从所有任务实例接收到成功的检查点通知时,它会向所有的任务发送检查点完成的通知,此通知对应于 2PC 协议的提交命令。当接收器任务接收到通知时,它将提交以前检查点的所有打开的事务。sink 任务一旦确认其检查点,就必须能够提交相应的事务,即使在出现故障的情况下也是如此。如果无法提交事务,则接收器将丢失数据。当所有 sink 任务提交它们的事务时,2PC 协议的迭代就算成功了。

我们来总结一下外部 sink 系统的要求:

外部 sink 系统必须提供事务支持,或者 sink 必须能够模拟外部系统上的事务。因此,sink 应该能够向 sink 系统写入数据,但是写入的数据在提交之前不能对外公开。

 在检查点间隔期间,事务必须开启并接受写操作。

 事务必须等到接收到检查点完成通知时,再提交。在恢复周期的情况下,可能需要一些时间。如果 sink 系统关闭事务,未提交的数据将丢失。

 处理一旦失败,sink 必须能够恢复事务。一些 sink 系统提供一个事务 ID 可用于提交或中止一个开启的事务。

 提交一个事务必须是一个幂等操作,sink 或外部系统应该能够做到:一个事务已经提交或重复提交,没有影响。

通过一个具体的例子,可以更容易地理解 sink 系统的协议和需求。例 8-15 显示了一个 TwoPhaseCommitSinkFunction,它只向文件系统写一次(精确一次)。实际上,这是前面讨论的 BucketingFileSink 的简化版本。

class TransactionalFileSink(val targetPath: String, valtempPath: String)

extends TwoPhaseCommitSinkFunction[(String, Double),String, Void](

createTypeInformation[String].createSerializer(new ExecutionConfig),

createTypeInformation[Void].createSerializer(new ExecutionConfig)) {

var transactionWriter: BufferedWriter = _

// Creates a temporary file for a transaction into which the records are written.

override def beginTransaction(): String = {

   // path of transaction file is built from current time and task index

   val timeNow =LocalDateTime.now(ZoneId.of("UTC")).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)

   val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask

   val transactionFile = s"$timeNow-$taskIdx"

   // create transaction file and writer

   val tFilePath = Paths.get(s"$tempPath/$transactionFile")

   Files.createFile(tFilePath)

   this.transactionWriter = Files.newBufferedWriter(tFilePath)

   println(s"Creating Transaction File: $tFilePath")

   // name of transaction file is returned to later identify the transaction

   transactionFile

}

/** Write record into the current transaction file. */

override def invoke(

   transaction: String,

   value: (String, Double),

   context: Context[_]): Unit = {

   transactionWriter.write(value.toString)

   transactionWriter.write('\n')

}

/** Flush and close the current transaction file. */

override def preCommit(transaction: String): Unit = {

   transactionWriter.flush()

   transactionWriter.close()

}

/** Commit a transaction by moving the precommitted transaction file

* to the target directory.

*/

override def commit(transaction: String): Unit = {

   val tFilePath = Paths.get(s"$tempPath/$transaction")

   // check if the file exists to ensure that the commit is idempotent

   if (Files.exists(tFilePath)) {

       val cFilePath = Paths.get(s"$targetPath/$transaction")

       Files.move(tFilePath, cFilePath)

  }

}

/** Aborts a transaction by deleting the transaction file. */

override def abort(transaction: String): Unit = {

val tFilePath = Paths.get(s"$tempPath/$transaction")

   if (Files.exists(tFilePath)) {

  Files.delete(tFilePath)

  }

}

}

TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]有三个类型参数:

 IN 指定输入记录的类型。在例 8-15 中,这是一个带有 String 和 Double 的 Tuple2。

 TXN 定义了一个事务标识符,可用于在失败后识别和恢复事务。在例 8-15 中,这是一个包含事务文件名称的字符串。

 CONTEXT 定义了一个可选的自定义上下文。例 8-15 中的 TransactionalFileSink 不需要上下文,因此将类型设置为 Void。

TwoPhaseCommitSinkFunction 的构造函数需要两个 TypeSerializer,一个用于 TXN 类型,另一个用于 CONTEXT 类型。

最后,TwoPhaseCommitSinkFunction 定义了五个需要实现的功能:

 beginTransaction(): TXN 启动一个新的事务并返回事务标识符。例 8-15 中的 TransactionalFileSink 创建一个新的事务文件,并将其名称作为标识符返回。

 invoke(txn: TXN, value: IN, context: Context[_]): Unit  将一个值写入当前事务。示例 8-15 中的 sink 将该值作为字符串追加到事务文件。

 preCommit(txn: TXN): Unit 预提交一个事务。预提交事务可能不会收到进一步的写操作。例 8-15 中的实现刷新并关闭事务文件。

 commit(txn: TXN): Unit 提交一个事务。此操作必须是幂等的,如果此方法被调用两次,不能将记录写入输出系统两次。在例 8-15 中,我们检查事务文件是否仍然存在,并将其移动到目标目录(如果存在的话)。

 abort(txn: TXN): Unit 中止一个事务。对于一个事务,此方法也可能被调用两次。例 8-15 中的 TransactionalFileSink 检查事务文件是否仍然存在,如果仍然存在,则删除它。

正如你所看到的,该接口的实现并不太复杂。然而,实现的复杂性和一致性保证取决于 sink 系统的特性和功能。例如,Flink 的 Kafka 构造器实现了 TwoPhaseCommitSinkFunction 接口。如前所述,如果由于超时而回滚事务,连接器可能会丢失数据。因此,即使它实现了 TwoPhaseCommitSinkFunction 接口,也不能提供精确一次保证。

异步访问外部系统 

除了提取或发送数据流之外,另一个需要与外部存储系统交互的常见用例是通过在远程数据库中查找信息来丰富数据流。最为众所周知的例子就是雅虎流处理基准测试,它基于一系列广告点击,产生丰富的信息,存储在键值存储中。

对于这些用例,最直接的方法是实现一个 MapFunction,它查询每条处理记录的数据存储,等待查询返回结果,丰富记录,并输出结果。虽然这种方法很容易实现,但它存在一个缺陷:对外部数据存储的请求都会增加明显的延迟(请求/响应涉及两条网络消息),而 MapFunction 会花费大部分时间等待查询结果。

Apache Flink 提供 AsyncFunction 来减少远程 I/O 调用的延迟。AsyncFunction 同时发送多个查询并异步处理它们的结果。可以将其配置为保留记录的顺序(请求返回的顺序可能与发送它们的顺序不同),或者按照查询结果的顺序返回结果,以进一步减少延迟。该函数还与 Flink 的检查点机制适当地集成在一起,检查当前等待响应的输入记录,并在恢复的情况下重复查询。此外,AsyncFunction 可以基于事件时间进行正确处理,即使结果是无序的,水位线也不会被记录覆盖。

为了利用 AsyncFunction,外部系统应该提供一个支持异步调用的客户端,大多数系统都是如此。如果系统只提供同步客户端,则可以创建线程来发送请求并处理它们。AsyncFunction 的接口如下图所示:

trait AsyncFunction[IN, OUT] extends Function {

   def asyncInvoke(input: IN, resultFuture:ResultFuture[OUT]): Unit

}

函数的类型参数定义其输入和输出类型。为每个使用两个参数的输入记录调用 asyncInvoke()方法。第一个参数是输入记录,第二个参数是回调对象,返回函数或异常结果。在示例 8-16 中,我们展示了如何在 DataStream 上应用 AsyncFunction。

val readings: DataStream[SensorReading] = ???

val sensorLocations: DataStream[(String, String)] =AsyncDataStream.orderedWait(

   readings,

   new DerbyAsyncFunction,

   5,

   TimeUnit.SECONDS, // timeout requests after 5 seconds

   100) // at most 100 concurrent requests

应用 AsyncFunction 的异步算子通过 AsyncDataStream 对象配置,该对象提供了两个静态方法:orderedWait() 和 unorderedWait()。这两个方法是重载方法的,使用不同的参数组合。orderedWait()应用一个异步算子,它按照输入记录的顺序发出结果,而 unorderWait()算子只确保水位线和检查点 barrier 保持对齐。其他参数指定记录的异步调用何时超时,以及启动多少并发请求。示例 8-17 显示了 DerbyAsyncFunction,它通过 JDBC 接口查询嵌入式 Derby 数据库。

class DerbyAsyncFunction extends AsyncFunction[SensorReading, (String, String)] {

// caching execution context used to handle the query threads

private lazy val cachingPoolExecCtx =ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

// direct execution context to forward result future to callback object

private lazy val directExecCtx =

ExecutionContext.fromExecutor(org.apache.flink.runtime.concurrent.Executors.directExecutor())

/**

* Executes JDBC query in a thread and handles the resulting Future

* with an asynchronous callback.

*/

override def asyncInvoke(

   reading: SensorReading,

   resultFuture: ResultFuture[(String, String)]): Unit = {

val sensor = reading.id

// get room from Derby table as Future

val room: Future[String] = Future {

// Creating a new connection and statement for each record.

// Note: This is NOT best practice!

// Connections and prepared statements should be cached.

val conn = DriverManager.getConnection("jdbc:derby:memory:flinkExample",newProperties())

val query = conn.createStatement()

// submit query and wait for result; this is a synchronous call

val result = query.executeQuery(s"SELECT room FROM SensorLocations WHERE sensor ='$sensor'")

// get room if there is one

val room = if (result.next()) {

result.getString(1)

} else {

"UNKNOWN ROOM"

}

// close resultset, statement, and connection

result.close()

query.close()

conn.close()

// return room

room

}(cachingPoolExecCtx)

// apply result handling callback on the room future

room.onComplete {

   case Success(r) => resultFuture.complete(Seq((sensor,r)))

   case Failure(e) => resultFuture.completeExceptionally(e)

}(directExecCtx)

}

}

示例 8-17 中的 DerbyAsyncFunction 的 asyncInvoke()方法将阻塞的 JDBC 查询在 Future 中,该查询是通过 CachedThreadPool 执行的。为了示例的简洁,我们为每个记录创建一个新的 JDBC 连接,当然这是非常低效的,实际应用中应该避免。Future[String]保存 JDBC 查询的结果。

最后,我们对 Future 应用 onComplete()回调方法,并将结果(或可能的异常)传递给 ResultFuture 处理程序。与 JDBC 查询 Future 相反,onComplete()回调方法由 DirectExecutor 处理,因为将结果传递给 ResultFuture 是一个轻量级操作,不需要专门的线程。注意,所有操作都是以非阻塞方式完成的。

需要指出的是,AsyncFunction 实例顺序调用其每个输入记录的,函数实例不是以多线程方式调用的。因此,asyncInvoke()方法应该通过启动异步请求并将结果转发给 ResultFuture 的回调方法来处理结果并快速返回,必须避免的常见反模式包括:

l 发送一个阻塞 asyncInvoke()方法的请求。

l 发送异步请求,但在 asyncInvoke()方法中等待请求完成。

 

总结

在本章中,你了解了 Flink DataStream 应用程序如何与外部系统进行读写交互,包括数据的读取和写入,以及应用程序实现不同端到端一致性保证的条件。不仅如此,我们介绍了 Flink 最常用的内置源和 sink 连接器,同时也代表着不同类型的存储系统,如消息队列、文件系统和键值存储。

随后,我们向你展示了如何实现自定义源和 sink 连接器,包括 WAL 和 2PC 接收端连接器,并提供了详细的例子。最后,你了解了 Flink 的 AsyncFunction,它可以通过异步执行和处理请求来显著提高与外部系统交互的性能。

1、精确一次状态一致性是端到端精确一次一致性的要求,但并不相同。

2、我们在“通用写提前槽”中详细讨论了 WAL 槽的一致性保证。

3、有关时间戳分配人接口的详细信息,请参阅第 6 章。

4、输入格式是该链接用于在数据集 API 中定义数据源的接口。

5、与 SQL 插入语句相比,CQL 插入语句的行为类似于升级查询——它们使用相同的主键覆盖现有的行。

6、在第 5 章中讨论了富函数。

7、通常使用富接收函数接口,因为接收函数通常需要在富函数中设置与外部系统的连接。open()方法。有关富功能界面的详情,请参见第 5 章。

8、如果确认消息丢失,则任务可能需要提交多个事务。

9、详见“Apache Kafka 接收端连接器”。

10、API 提供了一个具有各自静态方法的异步数据流类。

 

发布于: 2 小时前阅读数: 7
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
与外部系统的读写交互(八)