写点什么

大数据 -126 - Flink 一文搞懂有状态计算:State Backend 工作原理与性能差异详解 核心原理与作用

作者:武子康
  • 2025-10-16
    山东
  • 本文字数:4332 字

    阅读完需:约 14 分钟

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 10 月 13 日更新到:Java-147 深入浅出 MongoDB 分页查询详解:skip() + limit() + sort() 实现高效分页、性能优化与 WriteConcern 写入机制全解析 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Flink 广播状态

  • 基本概念、代码案例、测试结果


状态存储

Flink 的一个重要特性就是有状态计算(stateful processing),Flink 提供了简单易用的 API 来存储和获取状态,但是我们还是要理解 API 背后的原理,才能更好的使用。

State 存储方式

Flink 为 State 提供了三种开箱即用的后端存储方式(state backend):


  • Memory State Backend

  • File System (FS)State Backend

  • RocksDB State Backend

MemoryStateBackend

MemoryStateBackend 将工作状态数据保存在 TaskManager 的 Java 内存中。Key/Value 状态和 Window 算子使用哈希表存储数值和触发器。进行快照时(CheckPointing),生成的快照数据将和 CheckPoint ACK 消息一起发送给 JobManager,JobManager 将收到的所有快照数据保存在 Java 内存中。MemoryStateBackend 现在被默认配置成异步的,这样避免阻塞主线程的 pipline 处理,MemoryStateBackend 的状态存取的数据都非常快,但是不适合生产环境中使用。这是以为它有以下限制:


  • 每个 state 的默认大小被限制为 5MB(这个值可以通过 MemoryStateBackend 构造方法设置)

  • 每个 Task 的所有 State 数据(一个 Task 可能包含一个 Pipline 中的多个的 Operator)大小不能超过 RPC 系统的帧大小(akk.framesize 默认 10MB)

  • JobManager 收到的 State 数据总和不能超过 JobManager 内存


MemoryStateBackend 适合的场景:


  • 本地开发和调试

  • 状态很小的作业


FsStateBackend

FsStateBackend 需要配置一个 CheckPoint 路径,例如:hdfs://xxxxxxx 或者 file:///xxxxx,我们一般都会配置 HDFS 的目录。FsStateBackend 将工作状态数据保存在 TaskManager 的 Java 内存中,进行快照时,再将快照数据写入上面的配置的路径,然后将写入的文件路径告知 JobManager。JobManager 中保存所有状态的元数据信息(在 HA 的模式下,元数据会写入 CheckPoint 目录)。FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的 Pipline 处理,可以通过 FsStateBackend 构造函数取消该模式:


new FsStateBackend(path, false)
复制代码


FsStateBackend 适合的场景:


  • 大状态、长窗口、大键值(键或者值很大)状态的作业

  • 适合高可用方案


RocksDBStateBackend

RocksDBStateBackend 也需要配置一个 CheckPoint 路径,例如:hdfs://xxx 或者 file:///xxx,一般是 HDFS 路径。RocksDB 是一种可嵌入的可持久型的 key-value 存储引擎,提供 ACID 支持。由 Facebook 基于 LevelDB 开发,使用 LSM 存储引擎,是内存和磁盘的混合存储。RocksDBStateBackend 将工作状态保存在 TaskManager 的 RocksDB 数据库中,CheckPoint 时,RocksDB 中的所有数据会被传输到配置的文件目录,少量元数据信息保存在 JobManager 内存中(HA 模式下,会保存在 CheckPoint 目录)。RocksDBStateBackend 使用异步方式进行快照,RocksDBStateBackend 的限制:


  • 由于 RocksDB 的 JNI Bridge API 是基于 byte[] 的,RocksDBStateBackend 支持的每个 Key 或者每个 Value 的最大值不超过 2 的 31 次方(2GB)

  • 要注意的是,有 merge 操作的状态(例如:ListState),可能会在运行过程中超过 2 的 31 次时,导致程序失败。RocksDBStateBackend 适用于以下的场景:

  • 超大状态、超长窗口(天)、大键值状态的作业

  • 适合高可用模式


使用 RocksDBStateBackend 时,能够限制状态大小是 TaskManager 磁盘空间(相对于 FsStateBackend 状态大小限制与 TaskManager 内存)。这也导致 RocksDBStateBackend 的吞吐比其他两个要低一些,因为 RocksDB 的状态数据的读写都要经过反序列化/序列化。


RocksDBStateBackend 时目前三者中唯一支持增量 CheckPoint 的。


三者吞吐量对比

KeyedState 和 Operator State

State 分类

Operator State

(或 non-keyed state):每个 Operator State 绑定一个并行的 Operator 实例,KafkaConnector 是使用 OperatorState 的典型示例:每个并行的 Kafka Consumer 实例维护了每个 Kafka Topic 分区和该分区 Offset 的映射关系,并将这个映射关系保存为 OperatorState。在算子并行度改变时,OperatorState 也会重新分配。

Keyed State

这种 State 只存在于 KeyedStream 上的函数和操作中,比如 Keyed UDF(KeyedProcessFunction)Window State。可以把 Keyed State 想象成被分区的 OperatorState。每个 KeyedState 在逻辑上可以看成与一个 <parallel-operator-instance, key> 绑定,由于一个 key 肯定只存在于一个 Operator 实例,所以我们可以简单的的认为一个 <operator, key>对应一个 KeyedState。


每个 KeyedState 在逻辑上还会被分配一个 KeyGroup,分配方法如下:


MathUtils.murmurHash(key.hashCode()) % maxParallelism;
复制代码


其中 maxParallelism 是 Flink 程序的最大并行度,这个值一般我们不会去手动设置,使用默认的值(128)就好,这里注意下,maxParallelism 和我们运行程序时指定的算子并行度(parallelism)不同,parallelism 不能大于 maxParallelism,最多两者相等。


为什么会有 Key Group 这个概念呢?我们通常写程序,会给算子指定一个并行度,运行一段时间后,积累了一些 State,这时候数据量大了,需要增加并行度。我们修改并行度后重新提交,那这些已经存在的 State 该如何分配到各个 Operator 呢?这就有了最大并行度和 KeyGroup 的概念。上面的计算公式也说明了 KeyGroup 的个数最多是 maxParallelism 个。当并行度改变之后,我们在计算这个 Key 被分配到的 Operator:


keyGroupId * paralleism / maxParallelism;
复制代码


可以看到,一个 KeyGroupId 会对应一个 Operator,当并行度更改时,新的 Operator 会去拉取对应的 KeyGroup 的 KeyedState,这样就把 KeyedState 尽量均匀的分配给所有的 Operator 了。根据 State 数据是否被 Flink 托管,Flink 又将 State 分类为:


  • Managed State:被 Flink 托管,保存为内部的哈希表或者 RocksDB,CheckPoint 时,Flink 将 State 进行序列化编码。例如:ValueState ListState

  • Row State:Operator 自行管理的数据结构, Checkpoint 时,它们只能以 byte 数据写入 CheckPoint。


建议使用 Managed State,当使用 Managed State 时,Flink 会帮助我们更改并行度时重新分配 State,优化内存。

使用 ManageKeyedState

如何创建?上面提到,KeyedState 只能在 KeyedStream 上使用,可以通过 Stream.keyBy 创建 KeyedStream,我们可以创建以下几种:


  • ValueState

  • ListState

  • ReducingState

  • AggregatingState<IN,OUT>

  • MapState<UK,UV>

  • FoldingState<T,ACC>


每种 State 都对应各种的描述符,通过描述符 RuntimeContext 中获取对应的 State,而 RuntimeContext 只有 RichFunction 才能获取,所以想要使用 KeyedState,用户编写的类必须继承 RichFunction 或者其他子类。


  • ValueState getState(ValueStateDescriptor)

  • ReducingState getReducingState(ReducingStateDescriptor)

  • ListState getListState(ListStateDescriptor)

  • AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)

  • FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)

  • MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)


给 KeyedState 设置过期时间在 Flink 1.6.0 以后,还可以给 KeyedState 设置 TTL(Time-To-Live),当某一个 Key 的 State 数据过期时,会被 StateBackend 尽力删除。官方给出了示例:


StateTtlConfig ttlConfig = StateTtlConfig    .newBuilder(Time.seconds(1)) // 状态存活时间    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // TTL 何时被更新,这里配置的 state 创建和写入时    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)    .build();// 设置过期的 state 不被读取ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("textstate", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
复制代码


State 的 TTL 何时被更新?可以进行以下配置,默认只是 key 的 state 被 modify(创建或者更新)的时候才更新 TTL:


  • StateTtlConfig.UpdateType.OnCreateAndWrite:只在一个 key 的 state 创建和写入时更新 TTL(默认)

  • StateTtlConfig.UpdateType.onReadAndWrite:读取 state 时仍然更新 TTL


当 State 过期但是还未删除时,这个状态是否还可见?可以进行以下配置,默认是不可见的:


  • StateTtlConfig.StateVisibility.NerverReturnExpired:不可见(默认)

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:可见


注意:


  1. 状态存储开销

  2. 启用 TTL 特性会增加状态存储的大小,具体影响取决于 State Backend 类型:

  3. Heap State Backend:会额外存储一个 Java 8 对象,包含用户状态及其时间戳。

  4. RocksDB State Backend:在每个状态值(如 List 或 Map 的每个元素)序列化后增加 8 个字节的时间戳开销。

  5. 示例:如果使用 MapState 存储 10 万个键值对,RocksDB State Backend 会额外增加约 800KB 的存储空间(8 字节 × 100,000)。

  6. 时间语义限制

  7. 目前仅支持基于 Processing Time 的 TTL,暂不支持 Event Time 或 Ingestion Time。

  8. 应用场景:适用于需要定期清理过期数据的场景,如会话超时、缓存失效等。

  9. Checkpoint/Savepoint 恢复要求

  10. 从 Checkpoint/Savepoint 恢复时,TTL 的开启状态(是否启用)必须与保存时完全一致,否则会抛出StateMigrationException

  11. 建议:在作业升级或迁移时,需确保 TTL 配置的一致性。

  12. TTL 配置的临时性

  13. TTL 配置(如过期时间)不会持久化到 Checkpoint/Savepoint 中,仅对当前作业生效。

  14. 示例:如果作业重启且未显式设置 TTL,即使从包含 TTL 状态的 Checkpoint 恢复,也不会自动启用 TTL。

  15. MapState 对 NULL 值的支持

  16. 开启 TTL 的 MapState 仅在用户自定义序列化器支持 NULL 值时,才允许存储 NULL。

  17. 解决方案:如果序列化器不支持 NULL 值,可通过NullableSerializer包装一层,例如:


     MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>(         "myMapState",          String.class,          NullableSerializer.wrap(StringSerializer.INSTANCE)     );
复制代码


  • 注意事项:使用NullableSerializer会额外增加 1 字节的序列化开销。

使用 ManageOperatorState

(这里以及后续放到下一篇:大数据-127 Flink)

发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-126 - Flink一文搞懂有状态计算:State Backend 工作原理与性能差异详解 核心原理与作用_Java_武子康_InfoQ写作社区