顾名思义,Keyed State 是指一个 Key 对应一个 State,Flink 的 Keyed State 在 API 层面没有直接把当前的 Key 暴露出来,Key 和 State 的对应关系是在 Flink 内部进行维护的,本文主要介绍 Flink 在内部是如何管理 Key 和 State 的对应关系的。
Keyed State 的应用案例
以 GroupAggFunction 为例:
GroupAggFunction#processElement
// stores the accumulators
private transient ValueState<BaseRow> accState = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
...
BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
ValueStateDescriptor<BaseRow> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo);
accState = getRuntimeContext().getState(accDesc);
...
}
@Override
public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception {
...
BaseRow currentKey = ctx.getCurrentKey();
// 获取currentKey对应的State
BaseRow accumulators = accState.value();
if (!recordCounter.recordCountIsZero(accumulators)) {
...
// 对state做update操作
accState.update(accumulators);
...
} else {
...
// and clear all state
accState.clear();
// cleanup dataview under current key
function.cleanup();
}
}
复制代码
GroupAggFunction
是使用 Keyed State 的典型场景,可以看到 accState
仅在 open
时初始化一次,然后在使用过程中,我们不需要为每个 Key 重新获取一遍 accState
,我们不用关心当前的 Key 是什么,直接对 accState
操作就可以了。
如同开头我们的介绍,在 API 层面,Flink 对 Keyed State 的操作是和 Key 无关的,那么 Flink 是如何管理 State 和 Key 的关系的呢,接下来我们一起分析一下。
PS:从以上代码可以看出,通过 KeyedProcessFunction.Context#getCurrentKey()
可以获取到当前的 Key。
Key 是如何管理的
Flink 提供了一个 KeyContext 接口,Key 的管理主要是基于 KeyContext 接口实现的,用于存放和获取当前的 Key :
/**
* Inteface for setting and querying the current key of keyed operations.
*
* <p>This is mainly used by the timer system to query the key when creating timers
* and to set the correct key context when firing a timer.
*/
public interface KeyContext {
void setCurrentKey(Object key);
Object getCurrentKey();
}
复制代码
这两个方法的定义非常简单:
setCurrentKey
:接收到一条数据时,优先将数据对应的 Key 保存到 KeyContext 中。
getCurrentKey
:从 KeyContext 中获取当前数据对应的 Key ,再根据 Key 得到相应的 State。(Keyed State 的访问)
Set Current Key
StreamOperator
本身实现了 KeyContext 接口,在 operator 处理一条数据之前,StreamOneInputProcessor 会调用 setCurrentKey
先把这条数据的 Key,也就是 Current Key 保存起来。
StreamOneInputProcessor#processElement
@Override
public boolean processInput() throws Exception {
initializeNumRecordsIn();
...
processElement(recordOrMark, channel);
return true;
}
private void processElement(StreamElement recordOrMark, int channel) throws Exception {
if (recordOrMark.isRecord()) {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
// 处理数据之前(调用processElement方法之前)先设置当前处理的record 到 context中
// 根据record和预设好的KeySelector可以得到最终的Key
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
}
...
}
复制代码
AbstractStreamOperator#setKeyContextElement1:
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, stateKeySelector1);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement2(StreamRecord record) throws Exception {
setKeyContextElement(record, stateKeySelector2);
}
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
// KeyContext接口定义的方法
public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
@SuppressWarnings("unchecked,rawtypes")
AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
keyedStateBackend
// 这里设置key 到state backend中
rawBackend.setCurrentKey(key);
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting the current key context.", e);
}
}
}
复制代码
通过以上代码可以看出实际通过 record
和 keySelecor
计算出 Current Key ,并把 Current Key 设置到了当前 Operator 对应的 KeyedStateBackend
中。
AbstractKeyedStateBackend
的实现类有两个:
HeapKeyedStateBackend
RocksDBKeyedStateBackend
HeapKeyedStateBackend
直接继承了 AbstractKeyedStateBackend
的 setCurrentKey 方法:
/**
* @see KeyedStateBackend
*/
@Override
public void setCurrentKey(K newKey) {
notifyKeySelected(newKey);
this.keyContext.setCurrentKey(newKey);
this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups));
}
复制代码
实际上最终 Key 被存放到 keyContext 实例中,注意 keyContext
是接口 InternalKeyContext
。
(RocksDBKeyedStateBackend
的实现不同,此处省略。)
Current Key 是如何在各个组件中传递的
从 GroupAggFunction 中如何获取 Current Key 开始跟踪代码:
@Override
public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception {
...
BaseRow currentKey = ctx.getCurrentKey();
...
}
复制代码
我们先来找 Context 的实现类,从 StreamExecGroupAggregate#translateToPlanInternal
可以看出 GroupAggFunction
是和 KeyedProcessOperator
配合工作的,KeyedProcessOperator#processElement
:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
// 调用GroupAggFunction的processElement
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
...
@Override
@SuppressWarnings("unchecked")
public K getCurrentKey() {
return (K) KeyedProcessOperator.this.getCurrentKey();
}
}
复制代码
所以 Context
的实现类是 KeyedProcessOperator#ContextImpl
,具体的实现方式是直接调用 KeyedProcessOperator#getCurrentKey
方法 ,KeyedProcessOperator#getCurrentKey
中会调用AbstractKeyedStateBackend#getCurrentKey
得到 Current Key 并返回。
结合类图总结一下:
KeyedProcessOperator 负责调用 GroupAggFunction 的 processElement 方法来处理具体的数据;
StreamOperator 接口继承了 KeyContext 接口,也就是说 KeyedProcessOperator 本身就是个 KeyContext 的实现类;
前面代码中介绍了,KeyedProcessOperator 的父类 AbstractStreamOperator 中实现了具体的 setCurrentKey 方法,实际将保存 Current Key 的工作委派给了 AbstractKeyedStateBackend,也就是说 AbstractKeyedStateBackend 承担了最终的 Current Key 维护工作;
AbstractKeyedStateBackend 将 Current Key 存放在 InternalKeyContextImpl 实例中。
Keyed State 的访问机制
以在 GroupAggFunction 中获取 State 为例:
// 获取state,accState 为 HeapValueState 的实例
BaseRow accumulators = accState.value();
复制代码
org.apache.flink.runtime.state.heap.HeapValueState#value
@Override
public V value() {
// 调用stateTable.get(N)
final V result = stateTable.get(currentNamespace);
if (result == null) {
return getDefaultValue();
}
return result;
}
@Override
public void update(V value) {
if (value == null) {
clear();
return;
}
stateTable.put(currentNamespace, value);
}
复制代码
org.apache.flink.runtime.state.heap.StateTable#get(N)
/**
* Returns the state of the mapping for the composite of active key and given namespace.
*
* @param namespace the namespace. Not null.
* @return the states of the mapping with the specified key/namespace composite key, or {@code null}
* if no mapping for the specified key is found.
*/
public S get(N namespace) {
return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
}
/**
* Maps the composite of active key and given namespace to the specified state.
*
* @param namespace the namespace. Not null.
* @param state the state. Can be null.
*/
public void put(N namespace, S state) {
put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
}
复制代码
通过上面代码可以发现,在我们调用 accState.value() 时,最终会调用 StateTable
的 get 方法,使用 keyContext 中的 Current Key 来获取到对应的 Value 。
再看一下 StateTable 管理 Keyed State 的逻辑:
private S get(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
// 每个 keyGroup对应一个StateMap,K对应Key,N是Namespace(窗口),S是State
StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
if (stateMap == null) {
return null;
}
return stateMap.get(key, namespace);
}
@VisibleForTesting
protected StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
final int pos = indexToOffset(keyGroupIndex);
if (pos >= 0 && pos < keyGroupedStateMaps.length) {
return keyGroupedStateMaps[pos];
} else {
return null;
}
}
/**
* Translates a key-group id to the internal array offset.
*/
private int indexToOffset(int index) {
return index - keyGroupOffset;
}
/**
* The offset to the contiguous key groups.
*/
protected final int keyGroupOffset;
/**
* Map for holding the actual state objects. The outer array represents the key-groups.
* All array positions will be initialized with an empty state map.
*/
protected final StateMap<K, N, S>[] keyGroupedStateMaps;
复制代码
可以看出每个 keyGroup 对应一个 StateMap,StateMap 维护了 Key、Namespace、State 的对应关系。
总结
Flink 针对 Keyed State 的设计,让用户在使用层面感知不到 Current Key,所以用户需要稍微理解一下 State 的管理机制,才能更好的理解各种 State 的工作原理,比如 MapState<UK, UV>
中定义的 UserKey 其实并不是 Current Key。
在数据到达 operator 之前,Flink 会先将数据对应的 Key 保存在 InternalKeyContextImpl 中;
AbstractKeyedStateBackend 持有 InternalKeyContextImpl 实例,并负责维护 Current Key;
HeapValueState 是用户访问 State 的入口,它持有 StateTable 的实例,StateTable 持有 InternalKeyContextImpl 的实例,同时 StateTable 负责维护 Key 和 State 的关系;
结合前面说的,数据流入时会不断切换 InternalKeyContextImpl 中的 Current Key,StateTable 就可以实现为当前数据获取到对应的 State。
这样设计带来的好处是减少了用户管理状态的成本。因为状态都是在进程本地的,要么在 Heap 中存储,要么在 RocksDB 中存储,所以每个 SubTask 中只有当前 KeyGroupRange
对应的部分状态,用户如果随便拿个 Key 来获取状态,那么很用可能获取不到。
更多关于 State 的文章:
https://zhuanlan.zhihu.com/p/104171679
http://chenyuzhao.me/2017/12/24/Flink-%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%E7%9A%84%E8%AE%BE%E8%AE%A1-%E5%AD%98%E5%82%A8/
评论