大数据培训 | Flink SQL 窗口表值函数聚合实现原理
引子
表值函数(table-valued function, TVF),顾名思义就是指返回值是一张表的函数,在 Oracle、SQL Server 等数据库中屡见不鲜。
而在 Flink 的上一个稳定版本 1.13 中,社区通过 FLIP-145 提出了窗口表值函数(window TVF)的实现,用于替代旧版的窗口分组(grouped window)语法。
举个栗子,在 1.13 之前,我们需要写如下的 Flink SQL 语句来做 10 秒的滚动窗口聚合:
SELECT TUMBLE_START(procTime, INTERVAL '10' SECONDS) AS window_start,merchandiseId,COUNT(1) AS sellCount
FROM rtdw_dwd.kafka_order_done_log
GROUP BY TUMBLE(procTime, INTERVAL '10' SECONDS),merchandiseId;
在 1.13 版本中,则可以改写成如下的形式:
SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;
根据设计文档的描述,窗口表值函数的思想来自 2019 年的 SIGMOD 论文<>,而表值函数属于 SQL 2016 标准的一部分。
Calcite 从 1.25 版本起也开始提供对滚动窗口和滑动窗口 TVF 的支持。
除了标准化、易于实现之外,窗口 TVF 还支持旧版语法所不具备的一些特性,如 Local-Global 聚合优化、Distinct 解热点优化、Top-N 支持、GROUPING SETS 语法等。
接下来本文简单探究一下基于窗口 TVF 的聚合逻辑,以及对累积窗口 TVF 做一点简单的改进。
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
SQL 定义
窗口 TVF 函数的类图如下所示。
Flink SQL 在 Calcite 原生的 SqlWindowTableFunction 的基础上加了指示窗口时间的三列,即 window_start、window_end 和 window_time。
SqlWindowTableFunction 及其各个实现类的主要工作是校验 TVF 的操作数是否合法(通过内部抽象类 AbstractOperandMetadata 和对应的子类 OperandMetadataImpl)。这一部分不再赘述,在下文改进累积窗口 TVF 的代码中会涉及到_大数据培训。
物理计划
目前窗口 TVF 不能单独使用,需要配合窗口聚合或 Top-N 一起使用。以上文中的聚合为例,观察其执行计划如下。
EXPLAIN
SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0, 1, 2}], sellCount=[COUNT()])
+- LogicalProject(window_start=[$48], window_end=[$49], merchandiseId=[$10])
+- LogicalTableFunctionScan(invocation=[TUMBLE($47, DESCRIPTOR($47), 10000:INTERVAL SECOND)], rowType=[RecordType(BIGINT ts, /* ...... */, TIMESTAMP_LTZ(3) *PROCTIME* procTime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(ts=[$0], /* ...... */, procTime=[PROCTIME()])
+- LogicalTableScan(table=[[hive, rtdw_dwd, kafka_order_done_log]])
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[merchandiseId]])
+- Calc(select=[merchandiseId, PROCTIME() AS procTime])
+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])
== Optimized Execution Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[merchandiseId]])
+- Calc(select=[merchandiseId, PROCTIME() AS procTime])
+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])
在 Flink SQL 规则集中,与如上查询相关的规则按顺序依次是:
ConverterRule:StreamPhysicalWindowTableFunctionRule 该规则将调用窗口 TVF 的逻辑节点(即调用 SqlWindowTableFunction 的 LogicalTableFunctionScan 节点)转化为物理节点(StreamPhysicalWindowTableFunction)。
ConverterRule:StreamPhysicalWindowAggregateRule 该规则将含有 window_start、window_end 字段的逻辑聚合节点 FlinkLogicalAggregate 转化为物理的窗口聚合节点 StreamPhysicalWindowAggregate 以及其上的投影 StreamPhysicalCalc。在有其他分组字段的情况下,还会根据 FlinkRelDistribution#hash 生成 StreamPhysicalExchange 节点。
RelOptRule:PullUpWindowTableFunctionIntoWindowAggregateRule 顾名思义,该规则将上面两个规则产生的 RelNode 进行整理,消除代表窗口 TVF 的物理节点,并将它的语义上拉至聚合节点中,形成最终的物理计划。
然后,StreamPhysicalWindowAggregate 节点翻译成 StreamExecWindowAggregate 节点,进入执行阶段。
切片化窗口与执行
以前我们提过粒度太碎的滑动窗口会使得状态和 Timer 膨胀,比较危险,应该用滚动窗口+在线存储+读时聚合的方法代替。
社区在设计窗口 TVF 聚合时显然考虑到了这点,提出了切片化窗口(sliced window)的概念,并以此为基础设计了一套与 DataStream API Windowing 不同的窗口机制。
如下图的累积窗口所示,每两条纵向虚线之间的部分就是一个切片(slice)。
编辑
切片的本质就是将滑动/累积窗口化为滚动窗口,并尽可能地复用中间计算结果,降低状态压力。
自然地,前文所述的 Local-Global 聚合优化、Distinct 解热点优化就都可以无缝应用了。
那么,切片是如何分配的呢?答案是通过 SliceAssigner 体系,其类图如下。
编辑
注意 CumulativeSliceAssigner 多了一个 isIncremental()方法,这是下文所做优化的一步可见,对于滚动窗口而言,一个窗口就是一个切片;而对滑动/累积窗口而言,一个窗口可能包含多个切片,一个切片也可能位于多个窗口中。
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
所以共享切片的窗口要特别注意切片的过期与合并。
以负责累积窗口的 CumulativeSliceAssigner 为例,对应的逻辑如下。
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
long windowStart = getWindowStart(windowEnd);
long firstSliceEnd = windowStart + step;
long lastSliceEnd = windowStart + maxSize;
if (windowEnd == firstSliceEnd) {
// we share state in the first slice, skip cleanup for the first slice
reuseExpiredList.clear();
} else if (windowEnd == lastSliceEnd) {
// when this is the last slice,
// we need to cleanup the shared state (i.e. first slice) and the current slice
reuseExpiredList.reset(windowEnd, firstSliceEnd);
} else {
// clean up current slice
reuseExpiredList.reset(windowEnd);
}
return reuseExpiredList;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
long windowStart = getWindowStart(sliceEnd);
long firstSliceEnd = windowStart + step;
if (sliceEnd == firstSliceEnd) {
// if this is the first slice, there is nothing to merge
reuseToBeMergedList.clear();
} else {
// otherwise, merge the current slice state into the first slice state
reuseToBeMergedList.reset(sliceEnd);
}
callback.merge(firstSliceEnd, reuseToBeMergedList);
}
可见,累积窗口的中间结果会被合并到第一个切片中。窗口未结束时,除了第一个切片之外的其他切片触发后都会过期。
实际处理切片化窗口的算子名为 SlicingWindowOperator,它实际上是 SlicingWindowProcessor 的简单封装。SlicingWindowProcessor 的体系如下。
编辑
SlicingWindowProcessor 的三个重要组成部分分别是:
WindowBuffer:在托管内存区域分配的窗口数据缓存,避免在窗口未实际触发时高频访问状态;
WindowValueState:窗口的状态,其 schema 为[key, window_end, accumulator]。窗口结束时间作为窗口状态的命名空间(namespace);
NamespaceAggsHandleFunction:通过代码生成器 AggsHandlerCodeGenerator 生成的聚合函数体。注意它并不是一个 AggregateFunction,但是大致遵循其规范。
每当一条数据到来时,调用 AbstractWindowAggProcessor#processElement()方法,比较容易理解了。
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
// always register processing time for every element when processing time mode
windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
// the assigned slice has been triggered, which means current element is late,
// but maybe not need to drop
long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
// the last window has been triggered, so the element can be dropped now
return true;
} else {
windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
// we need to register a timer for the next unfired window,
// because this may the first time we see elements under the key
long unfiredFirstWindow = sliceEnd;
while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
unfiredFirstWindow += windowInterval;
}
windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
return false;
}
} else {
// the assigned slice hasn't been triggered, accumulate into the assigned slice
windowBuffer.addElement(key, sliceEnd, element);
return false;
}
}
而当切片需要被合并时,先从 WindowValueState 中取出已有的状态,再遍历切片,并调用 NamespaceAggsHandleFunction#merge()方法进行合并,最后更新状态。
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
// get base accumulator
final RowData acc;
if (mergeResult == null) {
// null means the merged is not on state, create a new acc
acc = aggregator.createAccumulators();
} else {
RowData stateAcc = windowState.value(mergeResult);
if (stateAcc == null) {
acc = aggregator.createAccumulators();
} else {
acc = stateAcc;
}
}
// set base accumulator
aggregator.setAccumulators(mergeResult, acc);
// merge slice accumulators
for (Long slice : toBeMerged) {
RowData sliceAcc = windowState.value(slice);
if (sliceAcc != null) {
aggregator.merge(slice, sliceAcc);
}
}
// set merged acc into state if the merged acc is on state
if (mergeResult != null) {
windowState.update(mergeResult, aggregator.getAccumulators());
}
}
看官若要观察 codegen 出来的聚合函数的代码,可在 log4j.properties 文件中加上:
logger.codegen.name = org.apache.flink.table.runtime.generated
logger.codegen.level = DEBUG
一点改进
有很多天级聚合+秒级触发的 Flink 作业,在 DataStream API 时代多由 ContinuousProcessingTimeTrigger 实现,1.13 版本之前的 SQL 则需要添加 table.exec.emit.early-fire 系列参数。
正式采用 1.13 版本后,累积窗口(cumulate window)完美契合此类需求。
但是,有些作业的 key 规模比较大,在一天的晚些时候会频繁向下游 Redis 刷入大量数据,造成不必要的压力。
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
因此,笔者对累积窗口 TVF 做了略有侵入的小改动,通过一个布尔参数 INCREMENTAL 可控制只输出切片之间发生变化的聚合结果。
操作很简单:
修改 SqlCumulateTableFunction 函数的签名,以及配套的窗口参数类 CumulativeWindowSpec 等;
修改 SliceSharedWindowAggProcess#fireWindow()方法,如下。
@Override
public void fireWindow(Long windowEnd) throws Exception {
sliceSharedAssigner.mergeSlices(windowEnd, this);
// we have set accumulator in the merge() method
RowData aggResult = aggregator.getValue(windowEnd);
if (!isWindowEmpty()) {
if (sliceSharedAssigner instanceof CumulativeSliceAssigner
&& ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {
RowData stateValue = windowState.value(windowEnd);
if (stateValue == null || !stateValue.equals(aggResult)) {
collect(aggResult);
}
} else {
collect(aggResult);
}
}
// we should register next window timer here,
// because slices are shared, maybe no elements arrived for the next slices
// ......
}
当然,此方案会带来访问状态的 overhead,后续会做极限压测以观察性能,并做适当修改。
文章来源于大数据技术与架构
评论