大数据 -126 - Flink 一文搞懂有状态计算:State Backend 工作原理与性能差异详解 核心原理与作用
点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 10 月 13 日更新到:Java-147 深入浅出 MongoDB 分页查询详解:skip() + limit() + sort() 实现高效分页、性能优化与 WriteConcern 写入机制全解析 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解
章节内容
上节我们完成了如下的内容:
Flink 广播状态
基本概念、代码案例、测试结果
状态存储
Flink 的一个重要特性就是有状态计算(stateful processing),Flink 提供了简单易用的 API 来存储和获取状态,但是我们还是要理解 API 背后的原理,才能更好的使用。
State 存储方式
Flink 为 State 提供了三种开箱即用的后端存储方式(state backend):
Memory State Backend
File System (FS)State Backend
RocksDB State Backend
MemoryStateBackend
MemoryStateBackend 将工作状态数据保存在 TaskManager 的 Java 内存中。Key/Value 状态和 Window 算子使用哈希表存储数值和触发器。进行快照时(CheckPointing),生成的快照数据将和 CheckPoint ACK 消息一起发送给 JobManager,JobManager 将收到的所有快照数据保存在 Java 内存中。MemoryStateBackend 现在被默认配置成异步的,这样避免阻塞主线程的 pipline 处理,MemoryStateBackend 的状态存取的数据都非常快,但是不适合生产环境中使用。这是以为它有以下限制:
每个 state 的默认大小被限制为 5MB(这个值可以通过 MemoryStateBackend 构造方法设置)
每个 Task 的所有 State 数据(一个 Task 可能包含一个 Pipline 中的多个的 Operator)大小不能超过 RPC 系统的帧大小(akk.framesize 默认 10MB)
JobManager 收到的 State 数据总和不能超过 JobManager 内存
MemoryStateBackend 适合的场景:
本地开发和调试
状态很小的作业
FsStateBackend
FsStateBackend 需要配置一个 CheckPoint 路径,例如:hdfs://xxxxxxx 或者 file:///xxxxx,我们一般都会配置 HDFS 的目录。FsStateBackend 将工作状态数据保存在 TaskManager 的 Java 内存中,进行快照时,再将快照数据写入上面的配置的路径,然后将写入的文件路径告知 JobManager。JobManager 中保存所有状态的元数据信息(在 HA 的模式下,元数据会写入 CheckPoint 目录)。FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的 Pipline 处理,可以通过 FsStateBackend 构造函数取消该模式:
FsStateBackend 适合的场景:
大状态、长窗口、大键值(键或者值很大)状态的作业
适合高可用方案
RocksDBStateBackend
RocksDBStateBackend 也需要配置一个 CheckPoint 路径,例如:hdfs://xxx 或者 file:///xxx,一般是 HDFS 路径。RocksDB 是一种可嵌入的可持久型的 key-value 存储引擎,提供 ACID 支持。由 Facebook 基于 LevelDB 开发,使用 LSM 存储引擎,是内存和磁盘的混合存储。RocksDBStateBackend 将工作状态保存在 TaskManager 的 RocksDB 数据库中,CheckPoint 时,RocksDB 中的所有数据会被传输到配置的文件目录,少量元数据信息保存在 JobManager 内存中(HA 模式下,会保存在 CheckPoint 目录)。RocksDBStateBackend 使用异步方式进行快照,RocksDBStateBackend 的限制:
由于 RocksDB 的 JNI Bridge API 是基于 byte[] 的,RocksDBStateBackend 支持的每个 Key 或者每个 Value 的最大值不超过 2 的 31 次方(2GB)
要注意的是,有 merge 操作的状态(例如:ListState),可能会在运行过程中超过 2 的 31 次时,导致程序失败。RocksDBStateBackend 适用于以下的场景:
超大状态、超长窗口(天)、大键值状态的作业
适合高可用模式
使用 RocksDBStateBackend 时,能够限制状态大小是 TaskManager 磁盘空间(相对于 FsStateBackend 状态大小限制与 TaskManager 内存)。这也导致 RocksDBStateBackend 的吞吐比其他两个要低一些,因为 RocksDB 的状态数据的读写都要经过反序列化/序列化。
RocksDBStateBackend 时目前三者中唯一支持增量 CheckPoint 的。
三者吞吐量对比
KeyedState 和 Operator State
State 分类
Operator State
(或 non-keyed state):每个 Operator State 绑定一个并行的 Operator 实例,KafkaConnector 是使用 OperatorState 的典型示例:每个并行的 Kafka Consumer 实例维护了每个 Kafka Topic 分区和该分区 Offset 的映射关系,并将这个映射关系保存为 OperatorState。在算子并行度改变时,OperatorState 也会重新分配。
Keyed State
这种 State 只存在于 KeyedStream 上的函数和操作中,比如 Keyed UDF(KeyedProcessFunction)Window State。可以把 Keyed State 想象成被分区的 OperatorState。每个 KeyedState 在逻辑上可以看成与一个 <parallel-operator-instance, key> 绑定,由于一个 key 肯定只存在于一个 Operator 实例,所以我们可以简单的的认为一个 <operator, key>对应一个 KeyedState。
每个 KeyedState 在逻辑上还会被分配一个 KeyGroup,分配方法如下:
其中 maxParallelism 是 Flink 程序的最大并行度,这个值一般我们不会去手动设置,使用默认的值(128)就好,这里注意下,maxParallelism 和我们运行程序时指定的算子并行度(parallelism)不同,parallelism 不能大于 maxParallelism,最多两者相等。
为什么会有 Key Group 这个概念呢?我们通常写程序,会给算子指定一个并行度,运行一段时间后,积累了一些 State,这时候数据量大了,需要增加并行度。我们修改并行度后重新提交,那这些已经存在的 State 该如何分配到各个 Operator 呢?这就有了最大并行度和 KeyGroup 的概念。上面的计算公式也说明了 KeyGroup 的个数最多是 maxParallelism 个。当并行度改变之后,我们在计算这个 Key 被分配到的 Operator:
可以看到,一个 KeyGroupId 会对应一个 Operator,当并行度更改时,新的 Operator 会去拉取对应的 KeyGroup 的 KeyedState,这样就把 KeyedState 尽量均匀的分配给所有的 Operator 了。根据 State 数据是否被 Flink 托管,Flink 又将 State 分类为:
Managed State:被 Flink 托管,保存为内部的哈希表或者 RocksDB,CheckPoint 时,Flink 将 State 进行序列化编码。例如:ValueState ListState
Row State:Operator 自行管理的数据结构, Checkpoint 时,它们只能以 byte 数据写入 CheckPoint。
建议使用 Managed State,当使用 Managed State 时,Flink 会帮助我们更改并行度时重新分配 State,优化内存。
使用 ManageKeyedState
如何创建?上面提到,KeyedState 只能在 KeyedStream 上使用,可以通过 Stream.keyBy 创建 KeyedStream,我们可以创建以下几种:
ValueState
ListState
ReducingState
AggregatingState<IN,OUT>
MapState<UK,UV>
FoldingState<T,ACC>
每种 State 都对应各种的描述符,通过描述符 RuntimeContext 中获取对应的 State,而 RuntimeContext 只有 RichFunction 才能获取,所以想要使用 KeyedState,用户编写的类必须继承 RichFunction 或者其他子类。
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)
FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)
MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)
给 KeyedState 设置过期时间在 Flink 1.6.0 以后,还可以给 KeyedState 设置 TTL(Time-To-Live),当某一个 Key 的 State 数据过期时,会被 StateBackend 尽力删除。官方给出了示例:
State 的 TTL 何时被更新?可以进行以下配置,默认只是 key 的 state 被 modify(创建或者更新)的时候才更新 TTL:
StateTtlConfig.UpdateType.OnCreateAndWrite:只在一个 key 的 state 创建和写入时更新 TTL(默认)
StateTtlConfig.UpdateType.onReadAndWrite:读取 state 时仍然更新 TTL
当 State 过期但是还未删除时,这个状态是否还可见?可以进行以下配置,默认是不可见的:
StateTtlConfig.StateVisibility.NerverReturnExpired:不可见(默认)
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:可见
注意:
状态存储开销
启用 TTL 特性会增加状态存储的大小,具体影响取决于 State Backend 类型:
Heap State Backend:会额外存储一个 Java 8 对象,包含用户状态及其时间戳。
RocksDB State Backend:在每个状态值(如 List 或 Map 的每个元素)序列化后增加 8 个字节的时间戳开销。
示例:如果使用 MapState 存储 10 万个键值对,RocksDB State Backend 会额外增加约 800KB 的存储空间(8 字节 × 100,000)。
时间语义限制
目前仅支持基于 Processing Time 的 TTL,暂不支持 Event Time 或 Ingestion Time。
应用场景:适用于需要定期清理过期数据的场景,如会话超时、缓存失效等。
Checkpoint/Savepoint 恢复要求
从 Checkpoint/Savepoint 恢复时,TTL 的开启状态(是否启用)必须与保存时完全一致,否则会抛出
StateMigrationException。建议:在作业升级或迁移时,需确保 TTL 配置的一致性。
TTL 配置的临时性
TTL 配置(如过期时间)不会持久化到 Checkpoint/Savepoint 中,仅对当前作业生效。
示例:如果作业重启且未显式设置 TTL,即使从包含 TTL 状态的 Checkpoint 恢复,也不会自动启用 TTL。
MapState 对 NULL 值的支持
开启 TTL 的 MapState 仅在用户自定义序列化器支持 NULL 值时,才允许存储 NULL。
解决方案:如果序列化器不支持 NULL 值,可通过
NullableSerializer包装一层,例如:
注意事项:使用
NullableSerializer会额外增加 1 字节的序列化开销。
使用 ManageOperatorState
(这里以及后续放到下一篇:大数据-127 Flink)
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/2d6b73c266ca0769e2216107f】。文章转载请联系作者。







评论