写点什么

大数据 -127 - Flink StateBackend 详解:Memory、Fs、RocksDB 与 OperatorState 管理机制与重分配原理

作者:武子康
  • 2025-10-17
    山东
  • 本文字数:5355 字

    阅读完需:约 18 分钟

大数据-127 - Flink StateBackend详解:Memory、Fs、RocksDB 与 OperatorState 管理机制与重分配原理

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 10 月 13 日更新到:Java-147 深入浅出 MongoDB 分页查询详解:skip() + limit() + sort() 实现高效分页、性能优化与 WriteConcern 写入机制全解析 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Flink 状态存储

  • MemoryStateBackend

  • FsStateBackend

  • RocksDBStateBackend

  • KeyedState

  • Operator State


上节进度

上节我们到了:使用 ManageOperatorState(这里以及后续放到下一篇:大数据-127 Flink)


接下来我们继续上节的内容

使用 ManageOperatorState

我们可以通过实现 CheckpointedFunction 或 ListCheckpointed<T extends Serializable>接口来使用 ManagedOperatorState。CheckpointFunctionCheckpointFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:


void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
复制代码


进行 Checkpoint 时会调用 snapshotState(),用户自定义函数化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 Checkpoint 恢复。因此 initializeState(),不仅是定义不同的状态类型初始化的地方,也需要包括状态恢复的逻辑。当前,ManagedOperatorState 以 list 的形式存在,这些状态是一个可序列化对象的集合 List,彼此独立,方便在改变后进行状态的重新分派,换句话说,这些对象是重新分配 non-keyed state 的最新粒度,根据状态不同访问方式,有如下几种重新分配的模式:


  • Event-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成,当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。比如说,算子 A 的并发读为 1,包含两个元素 element1 和 element2,当并发增加为 2 时,element1 会被分发到并发 0 上,element2 会被分发到并发 1 上。

  • Union redistribution:每个算子保存一个列表形式的状态集合,整个状态由所有列表拼接而成,当作业恢复或重新分配时,每个算子都将获得所有的状态数据。


ListCheckpointedListCheckpointed 接口是 CheckpointedFunction 的精简版,仅支持 even-split redistribution 的 list state,同样需要实现下面两个方法:


List<T> snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List<T> state) throws Exception;
复制代码


snapshotState()需要返回一个将写入到 checkpoint 的对象列表,restoreState 则需要处理恢复回来的对象列表,如果状态不可切分,则可以在 snapshotState()中返回,Collections.singletonList(MY_STATE)。

StateBackend 如何保存

上面我们介绍了三种 StateBackend:


  • MemoryStateBackend

  • FsStateBackend

  • RocksDBStateBackend



在 Flink 的实际实现中,对于同一种 StateBackend,不同的 State 在运行时会有细分的 StateBackend 托管,例如:MemoryStateBackend,就有 DefaultOperatorStateBackend 管理 OperatorState,HeapKeyedStateBackend 管理 KeyedState。


我们看到 MemoryStateBackend 和 FsStateBackend 对于 KeyedState 和 OperatorState 的存储都符合我们之前的理解,运行时 State 数据保存于内存,checkpoint 的保存位置需要注意下,并不是在 RocksDB 中,而是通过 DefaultOperatorStateBackend 保存于 TaskManager 内存。创建的源码如下:


// RocksDBStateBackend.java// 创建 keyed statebackendpublic <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){    ...    return new RocksDBKeyedStateBackend<>(    ...);}// 创建 Operator statebackendpublic OperatorStateBackend createOperatorStateBackend(    Environment env, String operatorIdentifier) throws Exception {        //the default for RocksDB; eventually there can be a operator state        backend based on RocksDB, too.        final boolean asyncSnapshots = true;        return new DefaultOperatorStateBackend(    ...);}
复制代码


源码中也标注了,未来会提供基于 RocksDB 存储的 OperatorState,所以当前即使使用 RocksDBStateBackend,OperatorState 也会超过内存限制。


Operator State 在内存中对应两种数据结构:数据结构 1:ListState 对应的实际实现类为 PartitionableListState,创建并注册的代码如下:


// DefaultOperatorStateBackend.javaprivate <S> ListState<S> getListState(...){    partitionableListState = new PartitionableListState<>(        new RegisteredOperatorStateBackendMetaInfo<>(            name,            partitionStateSerializer,            mode));    registeredOperatorStates.put(name, partitionableListState);}
复制代码


PartitionableListState 中通过 ArrayList 来保存 State 数据:


// PartitionableListState.java/*** The internal list the holds the elements of the state*/private final ArrayList<S> internalList;
复制代码


数据结构 2:BroadcastState 对应的实际实现类为 HeapBroadcastState 创建并注册的代码如下:


public <K, V> BroadcastState<K, V> getBroadcastState(...) {    broadcastState = new HeapBroadcastState<>(        new RegisteredBroadcastStateBackendMetaInfo<>(            name,            OperatorStateHandle.Mode.BROADCAST,            broadcastStateKeySerializer,            broadcastStateValueSerializer));    registeredBroadcastStates.put(name, broadcastState);}
复制代码


HeapBroadcastState 中通过 HashMap 来保存 State 数据:


/*** The internal map the holds the elements of the state.*/private final Map<K, V> backingMap;HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {    this(stateMetaInfo, new HashMap<>());}
复制代码

配置 StateBackend

我们知道 Flink 提供了三个 StateBackend,那么如何配置使用某个 StateBackend 呢?默认的配置在 conf/flink-conf.yaml 文件中 state.backend 指定,如果没有配置该值,就会使用 MemoryStateBackend,默认的是 StateBackend 可以被代码中的配置覆盖。

Per-job 设置

我们可以通过 StreamExecutionEnvironment 设置:


StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(newFsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
复制代码


如果想使用 RocksDBStateBackend,你需要将相关依赖加入你的 Flink 中:


<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>  <version>${flink.version}</version>  <scope>provided</scope></dependency>
复制代码

默认状态后端设置详解

在 Apache Flink 中,如果没有在程序中显式指定状态后端(StateBackend),系统将自动采用配置文件中的默认设置。具体配置路径为conf/flink-conf.yaml文件中的state.backend参数。这个参数支持以下三种主要配置选项,每种配置对应不同的状态后端实现:


  1. JobManager

  2. 对应MemoryStateBackend实现,这是最基础的存储方式。

  3. 状态数据存储在 JobManager 的内存中

  4. 检查点数据会存储在 JobManager 的内存中(默认)或指定的文件系统路径

  5. 典型应用场景:本地测试、小规模状态作业

  6. 示例配置:state.backend: jobmanager

  7. FileSystem

  8. 对应FsStateBackend实现,适合生产环境使用。

  9. 工作状态存储在 TaskManager 的内存中

  10. 检查点数据持久化到分布式文件系统(如 HDFS、S3 等)

  11. 支持大状态存储,但总大小受限于可用内存

  12. 典型配置示例:


     state.backend: filesystem     state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
复制代码


  1. RocksDB

  2. 对应RocksDBStateBackend实现,适合超大规模状态作业。

  3. 将工作状态存储在本地 RocksDB 实例中

  4. 检查点数据持久化到分布式文件系统

  5. 支持状态大小超过可用内存的情况

  6. 需要额外配置 RocksDB 原生库依赖

  7. 典型配置示例:


     state.backend: rocksdb     state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints     state.backend.rocksdb.localdir: /mnt/flink/rocksdb
复制代码


实际生产环境中,建议根据作业特点选择合适的状态后端。对于需要高可用性的场景,还应该配置state.checkpoints.dir参数指定可靠的分布式存储路径。此外,在 Flink 1.13 及以上版本,推荐使用新的HashMapStateBackendEmbeddedRocksDBStateBackend作为替代方案。

开启 Checkpoint

开启 CheckPoint 后,StateBackend 管理的 TaskManager 上的状态数据才会被定期备份到 JobManager 或外部存储,这些状态数据在作业失败恢复时会用到。我们可以通过以下代码开启和配置 CheckPoint:


StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//env.getConfig().disableSysoutLogging();//每 30 秒触发一次 checkpoint,checkpoint 时间应该远小于(该值 + MinPauseBetweenCheckpoints),否则程序会一直做checkpoint,影响数据处理速度env.enableCheckpointing(30000); // create a checkpoint every 30 seconds// set mode to exactly-once (this is the default)// flink 框架内保证 EXACTLY_ONCEenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// make sure 30 s of progress happen between checkpoints// 两个 checkpoints之间最少有 30s 间隔(上一个checkpoint完成到下一个checkpoint开始,默认 为0,这里建议设置为非0值)env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);// checkpoints have to complete within one minute, or are discarded// checkpoint 超时时间(默认 600 s)env.getCheckpointConfig().setCheckpointTimeout(600000);// allow only one checkpoint to be in progress at the same time// 同时只有一个checkpoint运行(默认)env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// enable externalized checkpoints which are retained after job cancellation// 取消作业时是否保留 checkpoint (默认不保留)env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint失败时 task 是否失败( 默认 true, checkpoint失败时,task会失败)env.getCheckpointConfig().setFailOnCheckpointingErrors(true);// 对 FsStateBackend 刷出去的文件进行文件压缩,减小 checkpoint 体积env.getConfig().setUseSnapshotCompression(true);
复制代码


FsStateBackend 和 RocksDBStateBackend CheckPoint 完成后最终保存到下面的目录:


hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/
复制代码


JOB_ID 是应用的唯一 ID,CHECK_POINT_ID 是每次 CheckPoint 时自增的数字 ID,我们可以从备份的 CheckPoint 数据恢复当时的作业状态。


flink-1x.x/bin/flink run -s hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/ path/to//your/jar
复制代码


我们可以实现 CheckpointedFunction 方法,在程序初始化的时候修改状态:


public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue> implements CheckpointedFunction {    ValueState<Integer> processedInt;    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);    }    @Override    public void processElement(KeyValue keyValue, Context context,        Collector<KeyValue> collector) throws Exception {        try{            Integer a = Integer.parseInt(keyValue.getValue());            processedInt.update(a);            collector.collect(keyValue);        }catch(Exception e){            e.printStackTrace();        }    }    @Override    public void initializeState(FunctionInitializationContext        functionInitializationContext) throws Exception {        processedInt = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor<>("processedInt", Integer.class));        if(functionInitializationContext.isRestored()){            //Apply logic to restore the data        }    }    @Override    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {        processedInt.clear();    }}```
复制代码


发布于: 刚刚阅读数: 7
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-127 - Flink StateBackend详解:Memory、Fs、RocksDB 与 OperatorState 管理机制与重分配原理_Java_武子康_InfoQ写作社区