写点什么

Flink Keyed State 对 Key 的管理机制解析

作者:邸星星
  • 2022 年 1 月 28 日
  • 本文字数:5365 字

    阅读完需:约 18 分钟

顾名思义,Keyed State 是指一个 Key 对应一个 State,Flink 的 Keyed State 在 API 层面没有直接把当前的 Key 暴露出来,Key 和 State 的对应关系是在 Flink 内部进行维护的,本文主要介绍 Flink 在内部是如何管理 Key 和 State 的对应关系的。

Keyed State 的应用案例

以 GroupAggFunction 为例:

GroupAggFunction#processElement

 

// stores the accumulatorsprivate transient ValueState<BaseRow> accState = null;
@Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters); ... BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes); ValueStateDescriptor<BaseRow> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo); accState = getRuntimeContext().getState(accDesc); ...}
@Overridepublic void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception { ... BaseRow currentKey = ctx.getCurrentKey(); // 获取currentKey对应的State BaseRow accumulators = accState.value();
if (!recordCounter.recordCountIsZero(accumulators)) { ... // 对state做update操作 accState.update(accumulators); ... } else { ... // and clear all state accState.clear(); // cleanup dataview under current key function.cleanup(); }}
复制代码

GroupAggFunction 是使用 Keyed State 的典型场景,可以看到 accState 仅在 open 时初始化一次,然后在使用过程中,我们不需要为每个 Key 重新获取一遍 accState,我们不用关心当前的 Key 是什么,直接对 accState 操作就可以了。

如同开头我们的介绍,在 API 层面,Flink 对 Keyed State 的操作是和 Key 无关的,那么 Flink 是如何管理 State 和 Key 的关系的呢,接下来我们一起分析一下。


PS:从以上代码可以看出,通过 KeyedProcessFunction.Context#getCurrentKey() 可以获取到当前的 Key。

Key 是如何管理的


Flink 提供了一个 KeyContext 接口,Key 的管理主要是基于 KeyContext 接口实现的,用于存放和获取当前的 Key :

/** * Inteface for setting and querying the current key of keyed operations. * * <p>This is mainly used by the timer system to query the key when creating timers * and to set the correct key context when firing a timer. */public interface KeyContext {
void setCurrentKey(Object key);
Object getCurrentKey();}
复制代码


这两个方法的定义非常简单:

setCurrentKey :接收到一条数据时,优先将数据对应的 Key 保存到 KeyContext 中。

getCurrentKey :从 KeyContext 中获取当前数据对应的 Key ,再根据 Key 得到相应的 State。(Keyed State 的访问)

Set Current Key

StreamOperator 本身实现了 KeyContext 接口,在 operator 处理一条数据之前,StreamOneInputProcessor 会调用 setCurrentKey 先把这条数据的 Key,也就是 Current Key 保存起来。

StreamOneInputProcessor#processElement


@Overridepublic boolean processInput() throws Exception {   initializeNumRecordsIn();   ...   processElement(recordOrMark, channel);   return true;}
private void processElement(StreamElement recordOrMark, int channel) throws Exception { if (recordOrMark.isRecord()) { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); // 处理数据之前(调用processElement方法之前)先设置当前处理的record 到 context中 // 根据record和预设好的KeySelector可以得到最终的Key streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } } ...}
复制代码


AbstractStreamOperator#setKeyContextElement1:


@Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception {   setKeyContextElement(record, stateKeySelector1);}
@Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement2(StreamRecord record) throws Exception { setKeyContextElement(record, stateKeySelector2);}
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception { if (selector != null) { Object key = selector.getKey(record.getValue()); setCurrentKey(key); }}
@SuppressWarnings({"unchecked", "rawtypes"})// KeyContext接口定义的方法public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions @SuppressWarnings("unchecked,rawtypes") AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;keyedStateBackend // 这里设置key 到state backend中 rawBackend.setCurrentKey(key); } catch (Exception e) { throw new RuntimeException("Exception occurred while setting the current key context.", e); } }}
复制代码


通过以上代码可以看出实际通过 recordkeySelecor 计算出 Current Key ,并把 Current Key 设置到了当前 Operator 对应的 KeyedStateBackend 中。

 

AbstractKeyedStateBackend 的实现类有两个:

  • HeapKeyedStateBackend

  • RocksDBKeyedStateBackend



HeapKeyedStateBackend 直接继承了 AbstractKeyedStateBackend 的 setCurrentKey 方法:


/** * @see KeyedStateBackend */@Overridepublic void setCurrentKey(K newKey) {   notifyKeySelected(newKey);   this.keyContext.setCurrentKey(newKey);   this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups));}
复制代码


实际上最终 Key 被存放到 keyContext 实例中,注意 keyContext 是接口 InternalKeyContext

RocksDBKeyedStateBackend 的实现不同,此处省略。)


Current Key 是如何在各个组件中传递的

从 GroupAggFunction 中如何获取 Current Key 开始跟踪代码:

@Overridepublic void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception {  ...  BaseRow currentKey = ctx.getCurrentKey();  ...}
复制代码


我们先来找 Context 的实现类,从 StreamExecGroupAggregate#translateToPlanInternal可以看出 GroupAggFunction 是和 KeyedProcessOperator 配合工作的,KeyedProcessOperator#processElement

@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {  collector.setTimestamp(element);  context.element = element;  // 调用GroupAggFunction的processElement  userFunction.processElement(element.getValue(), context, collector);  context.element = null;}
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context { ...
@Override @SuppressWarnings("unchecked") public K getCurrentKey() { return (K) KeyedProcessOperator.this.getCurrentKey(); } }
复制代码


所以 Context 的实现类是 KeyedProcessOperator#ContextImpl,具体的实现方式是直接调用 KeyedProcessOperator#getCurrentKey 方法 ,KeyedProcessOperator#getCurrentKey 中会调用AbstractKeyedStateBackend#getCurrentKey 得到 Current Key 并返回。


结合类图总结一下:



  • KeyedProcessOperator 负责调用 GroupAggFunction 的 processElement 方法来处理具体的数据;

  • StreamOperator 接口继承了 KeyContext 接口,也就是说 KeyedProcessOperator 本身就是个 KeyContext 的实现类;

  • 前面代码中介绍了,KeyedProcessOperator 的父类 AbstractStreamOperator 中实现了具体的 setCurrentKey 方法,实际将保存 Current Key 的工作委派给了 AbstractKeyedStateBackend,也就是说 AbstractKeyedStateBackend 承担了最终的 Current Key 维护工作;

  • AbstractKeyedStateBackend 将 Current Key 存放在 InternalKeyContextImpl 实例中。


Keyed State 的访问机制

以在 GroupAggFunction 中获取 State 为例:

// 获取state,accState 为 HeapValueState 的实例BaseRow accumulators = accState.value();
复制代码

org.apache.flink.runtime.state.heap.HeapValueState#value

@Overridepublic V value() {   // 调用stateTable.get(N)   final V result = stateTable.get(currentNamespace);
if (result == null) { return getDefaultValue(); }
return result;}

@Overridepublic void update(V value) { if (value == null) { clear(); return; } stateTable.put(currentNamespace, value);}
复制代码


org.apache.flink.runtime.state.heap.StateTable#get(N)


/** * Returns the state of the mapping for the composite of active key and given namespace. * * @param namespace the namespace. Not null. * @return the states of the mapping with the specified key/namespace composite key, or {@code null} * if no mapping for the specified key is found. */public S get(N namespace) {   return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);}
/** * Maps the composite of active key and given namespace to the specified state. * * @param namespace the namespace. Not null. * @param state the state. Can be null. */public void put(N namespace, S state) { put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);}
复制代码


通过上面代码可以发现,在我们调用 accState.value() 时,最终会调用 StateTable 的 get 方法,使用 keyContext 中的 Current Key 来获取到对应的 Value 。


再看一下 StateTable 管理 Keyed State 的逻辑:



private S get(K key, int keyGroupIndex, N namespace) { checkKeyNamespacePreconditions(key, namespace); // 每个 keyGroup对应一个StateMap,K对应Key,N是Namespace(窗口),S是State StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
if (stateMap == null) { return null; }
return stateMap.get(key, namespace);}
@VisibleForTestingprotected StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) { final int pos = indexToOffset(keyGroupIndex); if (pos >= 0 && pos < keyGroupedStateMaps.length) { return keyGroupedStateMaps[pos]; } else { return null; }}
/** * Translates a key-group id to the internal array offset. */private int indexToOffset(int index) { return index - keyGroupOffset;}
/** * The offset to the contiguous key groups. */protected final int keyGroupOffset;/** * Map for holding the actual state objects. The outer array represents the key-groups. * All array positions will be initialized with an empty state map. */protected final StateMap<K, N, S>[] keyGroupedStateMaps;

复制代码

可以看出每个 keyGroup 对应一个 StateMap,StateMap 维护了 Key、Namespace、State 的对应关系。

总结

Flink 针对 Keyed State 的设计,让用户在使用层面感知不到 Current Key,所以用户需要稍微理解一下 State 的管理机制,才能更好的理解各种 State 的工作原理,比如 MapState<UK, UV> 中定义的 UserKey 其实并不是 Current Key。


  • 在数据到达 operator 之前,Flink 会先将数据对应的 Key 保存在 InternalKeyContextImpl 中;

  • AbstractKeyedStateBackend 持有 InternalKeyContextImpl 实例,并负责维护 Current Key;

  • HeapValueState 是用户访问 State 的入口,它持有 StateTable 的实例,StateTable 持有 InternalKeyContextImpl 的实例,同时 StateTable 负责维护 Key 和 State 的关系;

  • 结合前面说的,数据流入时会不断切换 InternalKeyContextImpl 中的 Current Key,StateTable 就可以实现为当前数据获取到对应的 State。

 

这样设计带来的好处是减少了用户管理状态的成本。因为状态都是在进程本地的,要么在 Heap 中存储,要么在 RocksDB 中存储,所以每个 SubTask 中只有当前 KeyGroupRange 对应的部分状态,用户如果随便拿个 Key 来获取状态,那么很用可能获取不到。


更多关于 State 的文章:

https://zhuanlan.zhihu.com/p/104171679

http://chenyuzhao.me/2017/12/24/Flink-%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%E7%9A%84%E8%AE%BE%E8%AE%A1-%E5%AD%98%E5%82%A8/


用户头像

邸星星

关注

还未添加个人签名 2020.04.01 加入

还未添加个人简介

评论

发布
暂无评论
Flink Keyed State 对 Key 的管理机制解析