Flink State 和 Fault Tolerance(二)
Checkpoint
Flink 中的 State 在上一篇中介绍过,Flink 中的每个方法或算子都能够是有状态的,为了使 State 容错,需要为 State 创建 checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义
Checkpoint 机制在 Flink 容错机制/容错文档 中有更详细介绍。
Checkpoint 机制的先决条件:
一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列(Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub)或文件系统(HDFS,S3,GFS,NFS,Ceph)
存放 State 的持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...
启用和配置
默认情况下 Checkpoint 是不启用的。通过 StreamExecutionEnvironment
对象调用 enableCheckpointing(n)
启用 Checkpoint,其中 n
是以毫秒为单位的 Checkpoint 间隔。
Checkpoint 的配置项包括:
Exactly-once 或 At-least-once:Checkpoint 支持两种模式,可以选择向
enableCheckpointing(long interval, CheckpointingMode mode)
方法中传入使用模式。对于大多数应用来说,Exactly-once 是优选的。At-least-once 可能在某些要求超低延迟(几毫秒)的应用程序使用。Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则丢弃正在进行的 checkpoint。
Checkpoint 最小间隔时间(毫秒):定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果设置为 5000,表示在上一个 checkpoint 完成后的至少 5 秒后才会启动下一个 checkpoint,不论 checkpoint 的持续时间和间隔是多少。即 checkpoint 间隔永远不会小于此参数。是为了保证 checkpoint 之间能够完成一定量的数据处理工作。
配置“checkpoints 之间的最小时间”相比配置“checkpoint 间隔(启用 checkpoint 时传入的
n
)”通常更容易。因为 checkpoint 耗时有时会明显比平时更长,“checkpoints 之间的最小时间”更不容易受到影响(例如,目标存储系统临时性的响应缓慢)这个值还意味着并发 checkpoint 的数量是一个
Checkpoint 并发数:默认情况下,当一个 checkpoint 处于运行状态时,系统不会触发另一个 checkpoint。确保整个拓扑结构不会花费太多时间用于 checkpoint。该设置可以设置多个重叠的 checkpoint,特点的场景可能会需要。
当设置“checkpoints 间的最小时间”时,不能使用此配置。
Externalized checkpoint:可以配置周期存储 checkpoint 到外部系统中。Checkpoint 信息写入外部持久存储,在作业失败时不会自动清除,因此作业失败时可以用来恢复。
更多的细节请看 Externalized checkpoints 的部署文档。
Checkpoint 出错时,使 Task 失败或者继续进行 Task:决定了如果在 checkpoint 过程中发生错误,当前 Task 是否将失败或继续执行。默认会任务失败。或者禁用它时,这个任务将会简单的把 checkpoint 错误信息报告给 checkpoint coordinator 并继续运行。
优先从 checkpoint 恢复:该属性确定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,这可以潜在地减少恢复时间(checkpoint 恢复比 savepoint 恢复更快)。
选择 State backend
Flink 的 checkpointing 机制会将 timer 以及 stateful 的 operator 进行快照,然后存储下来,包括连接器(connectors),窗口(windows)以及任何用户自定义的状态。Checkpoint 存储在哪里取决于所配置的 State Backend(比如 JobManager 内存、文件系统、数据库)。
默认情况下,状态存储在 TaskManager 内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。通过 StreamExecutionEnvironment.setStateBackend(…)
来配置所选的 state backends。后文有更详细的介绍。
迭代任务中使用
Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)
。
在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。
State backend(1.13 以前的版本)
Flink 提供了多种 state backends,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml
指定的选项,也可以在每个作业中设置来覆盖默认选项:
Available State Backends
Flink 自带了以下几种开箱即用的 state backend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
在没有配置的情况下,系统默认使用 MemoryStateBackend
MemoryStateBackend
使用 MemoryStateBackend,数据以 Java 对象的形式存储在堆中, 在 checkpoint 时,对 State 做一次快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager,同时 JobManager 也将快照信息存储在堆内存中。。
MemoryStateBackend 的限制:
单个 State 的大小默认限制为 5MB,可以在 MemoryStateBackend 的构造函数中增加。
不论如何配置,State 大小都无法大于
akka.framesize
(JobManager 和 TaskManager 之间发送的最大消息的大小)JobManager 必须有足够的内存大小,存放聚合后的状态
MemoryStateBackend 适用场景:
本地开发和调试
只持有很小的 State,由每次只处理一条记录的函数(Map、FlatMap、Filter...),Kafka Consumer 仅仅需要非常小的状态;
建议同时将 managed memory 设为 0,以保证将最大限度的内存分配给 JVM 上的用户代码。
FsStateBackend
FsStateBackend 需要配置一个文件系统的 URL,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中(高可用模式下,将其写入到 Checkpoint 的元数据文件中)。
FsStateBackend 适用场景:
状态比较大、窗口时间比较长、key/value 比较大的 Job。
所有高可用的场景。
建议同时将 managed memory 设为 0,以保证将最大限度的内存分配给 JVM 上的用户代码。
RocksDBStateBackend
RocksDBStateBackend 需要配置一个文件系统的 URL 来,如 "hdfs://namenode/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 默认将数据存储在 TaskManager 的数据目录下。CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 Checkpoint 的元数据文件中)。
RocksDBStateBackend 的限制:
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上,所以每个 key 和 value 最大支持 2^31 字节。RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。
RocksDB 自身在支持较大 value 时候有一些问题。
RocksDBStateBackend 与 FsStateBackend 同样适用以下场景:
状态比较大、窗口时间比较长、key/value 比较大的 Job。
所有高可用的场景。
目前唯一支持增量 checkpoint。
与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制, RocksDBStateBackend 允许存储非常大的状态。这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的方式的效率要低很多。
请同时参考 Task Executor 内存配置 中关于 RocksDBStateBackend 的建议。
可以使用一些 RocksDB 的本地指标(metrics),默认是关闭的。能在这里找到关于 RocksDB 本地指标的文档。
Note. 每个 Slot 的 RocksDB 的总内存量也是有界限的。
性能比较
Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark)
Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。
不同 State backend 吞吐量对比
说明:
使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
Standalone 和 OnYarn 的总体差异不大,使用 FileSystem 和 Memory 时 OnYarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。
不同 State backend 延迟对比
说明:
使用 FileSystem 和 Memory 时延迟基本一致且较低。
使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。
设置 State Backend
如果没有明确指定,将使用 jobmanager 做为默认的 State backend。如果希望为集群上的所有作业建立不同的默认 State Backend,可以在 flink-conf.yaml
设置。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置。
设置每个 Job 的 State Backend
StreamExecutionEnvironment
可以对每个 Job 的 State Backend 进行设置,如下所示:
如果使用 RocksDBStateBackend
,或者需要在作业中通过编程方式动态配置 RocksDBStateBackend
,必须添加以下依赖到 Flink 项目中。
设置默认的(全局的) State Backend
在 flink-conf.yaml
可以通过 state.backend
设置默认的 State Backend。
可选值包括 jobmanager(MemoryStateBackend)、filesystem(FsStateBackend)、rocksdb(RocksDBStateBackend),或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: RocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
。
state.checkpoints.dir
选项指定了所有 State Backend 写 Checkpoint 数据和写元数据文件的目录。在这里找到关于 Checkpoint 目录结构的详细信息。
配置文件的部分示例如下所示:
RocksDB State Backend 更多细节
增量快照
RocksDBStateBackend 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
一个增量快照是基于(通常多个)前序快照构建的。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。
和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。
虽然状态数据量很大时推荐使用增量快照,但这并不是默认的快照机制,需要通过下述配置手动开启该功能:
在
flink-conf.yaml
中设置:state.backend.incremental: true
或者在代码中配置(来覆盖默认配置):
RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true);
需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size
只代表增量上传的数据量,而不是一次快照的完整数据量。
内存管理
Flink 致力于控制整个进程的内存消耗,以确保 Flink 任务管理器(TaskManager)有良好的内存使用,从而既不会在容器(Docker/Kubernetes,Yarn 等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。
为了达到上述目标,Flink 默认将 RocksDB 的可用内存配置为任务管理器的单 Slot(per-slot)托管内存量。这将为大多数应用程序提供良好的开箱即用体验,即大多数应用程序不需要调整 RocksDB 配置,简单的增加 Flink 的托管内存即可改善内存相关性能问题。
当然,也可以选择不使用 Flink 自带的内存管理,而是手动为 RocksDB 的每个列族(ColumnFamily)分配内存(每个算子的每个 state 都对应一个列族)。这为专业用户提供了对 RocksDB 进行更细粒度控制的途径,同时也意味着用户需要自行保证总内存消耗不会超过(尤其是容器)环境的限制。请参阅 large state tuning 了解有关大状态数据性能调优的一些指导原则。
RocksDB 使用托管内存
这个功能默认打开,并且可以通过 state.backend.rocksdb.memory.managed
配置项控制。
Flink 并不直接控制 RocksDB 的本地内存分配,而是通过配置 RocksDB 来确保其使用的内存正好与 Flink 的托管内存预算相同。这是在 Task Slot(per-slot)级别上完成的(托管内存以 Task Slot 为粒度计算)。
为了设置 RocksDB 实例的总内存使用量,Flink 对同一个任务槽上的所有 RocksDB 实例使用共享的 cache 以及 write buffer manager。 共享 cache 将对 RocksDB 中内存消耗的三个主要来源(块缓存、索引和 bloom 过滤器、MemTables)设置上限。
Flink 还提供了两个参数来控制写路径(MemTable)和读路径(索引及过滤器,读缓存)之间的内存分配。当发现 RocksDB 由于缺少写缓冲内存(频繁刷新)或读缓存未命中而性能不佳时,可以使用这些参数调整读写间的内存分配。
state.backend.rocksdb.memory.write-buffer-ratio
,默认值0.5
,即 50% 的给定内存会分配给写缓冲区使用。state.backend.rocksdb.memory.high-prio-pool-ratio
,默认值0.1
,即 10% 的 block cache 内存会优先分配给索引及过滤器。强烈建议不要将此值设置为零,以防止索引和过滤器被频繁踢出缓存而导致性能问题。此外,默认将 L0 级的过滤器和索引将被固定到缓存中以提高性能,更多详细信息请参阅 RocksDB 文档。
注意. 上述机制开启时将覆盖用户在 PredefinedOptions 和 RocksDBOptionsFactory 中对 block cache 和 write buffer 进行的配置。
注意. 仅面向专业用户:若要手动控制内存,可以将 state.backend.rocksdb.memory.managed
设置为 false
,并通过 ColumnFamilyOptions 配置 RocksDB。 或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 state.backend.rocksdb.memory.fixed-per-slot
选项)。 注意在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。
计时器(内存 vs. RocksDB)
计时器(Timer,Flink Streaming API 提供的用于感知并利用处理时间/事件时间变化的机制)用于安排稍后的操作(基于事件时间或处理时间),例如触发窗口或回调 ProcessFunction
。
当选择 RocksDBStateBackend 时,默认情况下计时器也存储在 RocksDB 中。这是一种健壮且可扩展的方式,允许应用程序使用很多个计时器。另一方面,在 RocksDB 中维护计时器会有一定的成本,因此 Flink 也提供了将计时器存储在 JVM 堆上而使用 RocksDB 存储其他状态的选项。当计时器数量较少时,基于堆的计时器可以有更好的性能。
可以通过将 state.backend.rocksdb.timer-service.factory
配置项设置为 heap
(而不是默认的 rocksdb
)来将计时器存储在堆上。
注意. 在 RocksDBStateBackend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照。其他状态(如 keyed state)可以被异步快照。
开启 RocksDB 原生监控指标
可以选择使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标,并且可以选择性的指定特定指标进行汇报。
注意. 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。
列族级别的预定义选项
注意. 在引入 RocksDB 使用托管内存 功能后,此机制应限于在专家调优或故障处理中使用。
使用预定义选项,用户可以在每个 RocksDB 列族上应用一些预定义的配置,例如配置内存使用、线程、Compaction 设置等。目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
有两种方法可以选择要应用的预定义选项:
通过
state.backend.rocksdb.predefined-options
配置项将选项名称设置进flink-conf.yaml
。通过程序设置:
RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
。
该选项的默认值是 DEFAULT
,对应 PredefinedOptions.DEFAULT
。
通过 RocksDBOptionsFactory 配置 RocksDB 选项
注意. 在引入 RocksDB 使用托管内存 功能后,此机制应限于在专家调优或故障处理中使用。
可以通过配置一个 RocksDBOptionsFactory
来手动控制 RocksDB 的选项。此机制可以对列族的设置进行细粒度控制,例如内存使用、线程、Compaction 设置等。目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
有两种方法可以将 RocksDBOptionsFactory
传递给 RocksDBStateBackend:
通过
state.backend.rocksdb.options-factory
选项将工厂实现类的名称设置到flink-conf.yaml
。通过程序设置,例如
RocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory());
。
通过程序设置的 RocksDBOptionsFactory
将覆盖 flink-conf.yaml
配置文件的设置,且 RocksDBOptionsFactory
设置的优先级高于预定义选项(PredefinedOptions
)。
注意. RocksDB 是一个本地库,直接从进程分配内存, 而不是从 JVM 分配内存。分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(TaskManager
)的 JVM 堆中减去。不这样做可能会导致 JVM 进程由于分配的内存超过申请值而被 YARN/Mesos 等资源管理框架终止。
State backend(1.13 版本)
Available State Backends
Flink 提供的开箱即用的 State Backends:
HashMapStateBackend
EmbeddedRocksDBStateBackend
在没有配置的情况下,系统默认使用 HashMapStateBackend
HashMapStateBackend
HashMapStateBackend 在将数据作为对象保存在 Java 堆上。Key/value 状态和窗口操作符持有 hash table 存储值、触发器等。
HashMapStateBackend 适用场景:
状态比较大、窗口时间比较长、key/value 比较大的 Job。
所有高可用的场景。
建议将 managed memory 设置为 0。这将确保为 JVM 上的用户代码分配最大的内存量。
EmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,默认将数据存储在 TaskManager 的数据目录下。
EmbeddedRocksDBStateBackend 通常是异步的。
EmbeddedRocksDBStateBackend 的介绍参考上文 RocksDBStateBackend。
State Backend 的选择
在决定 HashMapStateBackend 和 RocksDB 时,需要在性能和可伸缩性之间进行选择。
HashMapStateBackend 非常快,因为状态访问和更新都对 Java 堆上的对象进行操作;但是,状态大小受集群内可用内存的限制。另一方面,RocksDB 可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的。但是,状态访问和更新都需要序列化并可能从磁盘读取数据,这会导致平均性能比内存状态后端慢一个数量级。
Flink 1.13 开始,统一了 savepoint 的二进制格式,这意味着可以获取一个 savepoint,然后使用不同的 state backend 进行恢复。如果想切换 state backend,应该首先升级的 Flink 版本,然后创建一个 savepoint,之后,可以用不同的 state backend 还原它。
设置 State Backend
与之前版本设置方式类似,
Job 中设置
全局设置
state.backend 的可选值发生变化:hashmap(HashMapStateBackend),rocksdb(EmbeddedRocksDBStateBackend),或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名
Migrating from Legacy Backends
从 Flink 1.13 开始,社区修改了 state backend 公共类,以帮助用户更好地理解 local state 存储和 checkpoint 存储的分离。此更改不会影响 Flink 的 state backend 或 checkpointing 的运行时实现或特性。用户可以迁移现有应用程序以使用新的 API,而不会丢失任何状态或一致性。
MemoryStateBackend
MemoryStateBackend 等同于使用 HashMapStateBackend 和 JobManagerCheckpointStorage
flink-conf.yaml
配置
代码设置
FsStateBackend
FsStateBackend 等同于使用 HashMapStateBackend 和 FileSystemCheckpointStorage
flink-conf.yaml
配置
代码设置
RocksDBStateBackend
RocksDBStateBackend 等同于使用 EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage.
flink-conf.yaml
配置
代码设置
主要引用
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/
https://flink.xskoo.com/dev/stream/state/checkpointing.html
版权声明: 本文为 InfoQ 作者【Alex🐒】的原创文章。
原文链接:【http://xie.infoq.cn/article/0f95865c6e9a0ba5a5da3e18b】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论