顾名思义,Keyed State 是指一个 Key 对应一个 State,Flink 的 Keyed State 在 API 层面没有直接把当前的 Key 暴露出来,Key 和 State 的对应关系是在 Flink 内部进行维护的,本文主要介绍 Flink 在内部是如何管理 Key 和 State 的对应关系的。
Keyed State 的应用案例
以 GroupAggFunction 为例:
GroupAggFunction#processElement
// stores the accumulatorsprivate transient ValueState<BaseRow> accState = null;
@Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters); ... BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes); ValueStateDescriptor<BaseRow> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo); accState = getRuntimeContext().getState(accDesc); ...}
@Overridepublic void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception { ... BaseRow currentKey = ctx.getCurrentKey(); // 获取currentKey对应的State BaseRow accumulators = accState.value();
if (!recordCounter.recordCountIsZero(accumulators)) { ... // 对state做update操作 accState.update(accumulators); ... } else { ... // and clear all state accState.clear(); // cleanup dataview under current key function.cleanup(); }}
复制代码
GroupAggFunction 是使用 Keyed State 的典型场景,可以看到 accState 仅在 open 时初始化一次,然后在使用过程中,我们不需要为每个 Key 重新获取一遍 accState,我们不用关心当前的 Key 是什么,直接对 accState 操作就可以了。
如同开头我们的介绍,在 API 层面,Flink 对 Keyed State 的操作是和 Key 无关的,那么 Flink 是如何管理 State 和 Key 的关系的呢,接下来我们一起分析一下。
PS:从以上代码可以看出,通过 KeyedProcessFunction.Context#getCurrentKey() 可以获取到当前的 Key。
Key 是如何管理的
Flink 提供了一个 KeyContext 接口,Key 的管理主要是基于 KeyContext 接口实现的,用于存放和获取当前的 Key :
/** * Inteface for setting and querying the current key of keyed operations. * * <p>This is mainly used by the timer system to query the key when creating timers * and to set the correct key context when firing a timer. */public interface KeyContext {
void setCurrentKey(Object key);
Object getCurrentKey();}
复制代码
这两个方法的定义非常简单:
setCurrentKey :接收到一条数据时,优先将数据对应的 Key 保存到 KeyContext 中。
getCurrentKey :从 KeyContext 中获取当前数据对应的 Key ,再根据 Key 得到相应的 State。(Keyed State 的访问)
Set Current Key
StreamOperator 本身实现了 KeyContext 接口,在 operator 处理一条数据之前,StreamOneInputProcessor 会调用 setCurrentKey 先把这条数据的 Key,也就是 Current Key 保存起来。
StreamOneInputProcessor#processElement
@Overridepublic boolean processInput() throws Exception { initializeNumRecordsIn(); ... processElement(recordOrMark, channel); return true;}
private void processElement(StreamElement recordOrMark, int channel) throws Exception { if (recordOrMark.isRecord()) { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); // 处理数据之前(调用processElement方法之前)先设置当前处理的record 到 context中 // 根据record和预设好的KeySelector可以得到最终的Key streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } } ...}
复制代码
AbstractStreamOperator#setKeyContextElement1:
@Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception { setKeyContextElement(record, stateKeySelector1);}
@Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement2(StreamRecord record) throws Exception { setKeyContextElement(record, stateKeySelector2);}
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception { if (selector != null) { Object key = selector.getKey(record.getValue()); setCurrentKey(key); }}
@SuppressWarnings({"unchecked", "rawtypes"})// KeyContext接口定义的方法public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions @SuppressWarnings("unchecked,rawtypes") AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;keyedStateBackend // 这里设置key 到state backend中 rawBackend.setCurrentKey(key); } catch (Exception e) { throw new RuntimeException("Exception occurred while setting the current key context.", e); } }}
复制代码
通过以上代码可以看出实际通过 record 和 keySelecor 计算出 Current Key ,并把 Current Key 设置到了当前 Operator 对应的 KeyedStateBackend 中。
AbstractKeyedStateBackend 的实现类有两个:
HeapKeyedStateBackend
RocksDBKeyedStateBackend
HeapKeyedStateBackend 直接继承了 AbstractKeyedStateBackend 的 setCurrentKey 方法:
/** * @see KeyedStateBackend */@Overridepublic void setCurrentKey(K newKey) { notifyKeySelected(newKey); this.keyContext.setCurrentKey(newKey); this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups));}
复制代码
实际上最终 Key 被存放到 keyContext 实例中,注意 keyContext 是接口 InternalKeyContext 。
(RocksDBKeyedStateBackend 的实现不同,此处省略。)
Current Key 是如何在各个组件中传递的
从 GroupAggFunction 中如何获取 Current Key 开始跟踪代码:
@Overridepublic void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception { ... BaseRow currentKey = ctx.getCurrentKey(); ...}
复制代码
我们先来找 Context 的实现类,从 StreamExecGroupAggregate#translateToPlanInternal可以看出 GroupAggFunction 是和 KeyedProcessOperator 配合工作的,KeyedProcessOperator#processElement:
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); context.element = element; // 调用GroupAggFunction的processElement userFunction.processElement(element.getValue(), context, collector); context.element = null;}
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context { ...
@Override @SuppressWarnings("unchecked") public K getCurrentKey() { return (K) KeyedProcessOperator.this.getCurrentKey(); } }
复制代码
所以 Context 的实现类是 KeyedProcessOperator#ContextImpl,具体的实现方式是直接调用 KeyedProcessOperator#getCurrentKey 方法 ,KeyedProcessOperator#getCurrentKey 中会调用AbstractKeyedStateBackend#getCurrentKey 得到 Current Key 并返回。
结合类图总结一下:
KeyedProcessOperator 负责调用 GroupAggFunction 的 processElement 方法来处理具体的数据;
StreamOperator 接口继承了 KeyContext 接口,也就是说 KeyedProcessOperator 本身就是个 KeyContext 的实现类;
前面代码中介绍了,KeyedProcessOperator 的父类 AbstractStreamOperator 中实现了具体的 setCurrentKey 方法,实际将保存 Current Key 的工作委派给了 AbstractKeyedStateBackend,也就是说 AbstractKeyedStateBackend 承担了最终的 Current Key 维护工作;
AbstractKeyedStateBackend 将 Current Key 存放在 InternalKeyContextImpl 实例中。
Keyed State 的访问机制
以在 GroupAggFunction 中获取 State 为例:
// 获取state,accState 为 HeapValueState 的实例BaseRow accumulators = accState.value();
复制代码
org.apache.flink.runtime.state.heap.HeapValueState#value
@Overridepublic V value() { // 调用stateTable.get(N) final V result = stateTable.get(currentNamespace);
if (result == null) { return getDefaultValue(); }
return result;}
@Overridepublic void update(V value) { if (value == null) { clear(); return; } stateTable.put(currentNamespace, value);}
复制代码
org.apache.flink.runtime.state.heap.StateTable#get(N)
/** * Returns the state of the mapping for the composite of active key and given namespace. * * @param namespace the namespace. Not null. * @return the states of the mapping with the specified key/namespace composite key, or {@code null} * if no mapping for the specified key is found. */public S get(N namespace) { return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);}
/** * Maps the composite of active key and given namespace to the specified state. * * @param namespace the namespace. Not null. * @param state the state. Can be null. */public void put(N namespace, S state) { put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);}
复制代码
通过上面代码可以发现,在我们调用 accState.value() 时,最终会调用 StateTable 的 get 方法,使用 keyContext 中的 Current Key 来获取到对应的 Value 。
再看一下 StateTable 管理 Keyed State 的逻辑:
private S get(K key, int keyGroupIndex, N namespace) { checkKeyNamespacePreconditions(key, namespace); // 每个 keyGroup对应一个StateMap,K对应Key,N是Namespace(窗口),S是State StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
if (stateMap == null) { return null; }
return stateMap.get(key, namespace);}
@VisibleForTestingprotected StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) { final int pos = indexToOffset(keyGroupIndex); if (pos >= 0 && pos < keyGroupedStateMaps.length) { return keyGroupedStateMaps[pos]; } else { return null; }}
/** * Translates a key-group id to the internal array offset. */private int indexToOffset(int index) { return index - keyGroupOffset;}
/** * The offset to the contiguous key groups. */protected final int keyGroupOffset;/** * Map for holding the actual state objects. The outer array represents the key-groups. * All array positions will be initialized with an empty state map. */protected final StateMap<K, N, S>[] keyGroupedStateMaps;
复制代码
可以看出每个 keyGroup 对应一个 StateMap,StateMap 维护了 Key、Namespace、State 的对应关系。
总结
Flink 针对 Keyed State 的设计,让用户在使用层面感知不到 Current Key,所以用户需要稍微理解一下 State 的管理机制,才能更好的理解各种 State 的工作原理,比如 MapState<UK, UV> 中定义的 UserKey 其实并不是 Current Key。
在数据到达 operator 之前,Flink 会先将数据对应的 Key 保存在 InternalKeyContextImpl 中;
AbstractKeyedStateBackend 持有 InternalKeyContextImpl 实例,并负责维护 Current Key;
HeapValueState 是用户访问 State 的入口,它持有 StateTable 的实例,StateTable 持有 InternalKeyContextImpl 的实例,同时 StateTable 负责维护 Key 和 State 的关系;
结合前面说的,数据流入时会不断切换 InternalKeyContextImpl 中的 Current Key,StateTable 就可以实现为当前数据获取到对应的 State。
这样设计带来的好处是减少了用户管理状态的成本。因为状态都是在进程本地的,要么在 Heap 中存储,要么在 RocksDB 中存储,所以每个 SubTask 中只有当前 KeyGroupRange 对应的部分状态,用户如果随便拿个 Key 来获取状态,那么很用可能获取不到。
更多关于 State 的文章:
https://zhuanlan.zhihu.com/p/104171679
http://chenyuzhao.me/2017/12/24/Flink-%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%E7%9A%84%E8%AE%BE%E8%AE%A1-%E5%AD%98%E5%82%A8/
评论