写点什么

一文搞懂 FlinkSQL 函数 LAST_VALUE 的原理

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

背景

刚开始接触 FlinkSQL 时,对 LAST_VALUE 特别好奇,虽然工作当中有在用到,但还是特别的想知道它是怎么实现的,今天终于可以总结一下


原理

当我们写入如下类似的 sql 时,就会用到 LAST_VALUE 函数

select LAST_VALUE(status) from temp;
复制代码

LAST_VALUE 函数对应的具体类为 LastValueWithRetractAggFunction。

LAST_VALUE 函数之所以能够起作用最关键的是


 /** Accumulator for LAST_VALUE with retraction. */    public static class LastValueWithRetractAccumulator<T> {        public T lastValue = null;        public Long lastOrder = null;        // value timestamp        public MapView<T, List<Long>> valueToOrderMap = new MapView<>();        // timestamp value        public MapView<Long, List<T>> orderToValueMap = new MapView<>();
...... }
@SuppressWarnings("unchecked") public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception { if (value != null) {//传进来的是 null 不做任何操作 T v = (T) value; Long order = System.currentTimeMillis(); List<Long> orderList = acc.valueToOrderMap.get(v); if (orderList == null) { orderList = new ArrayList<>(); } orderList.add(order); acc.valueToOrderMap.put(v, orderList); accumulate(acc, value, order); } }
@SuppressWarnings("unchecked") public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order) throws Exception { if (value != null) { T v = (T) value; Long prevOrder = acc.lastOrder;// 默认是 null if (prevOrder == null || prevOrder <= order) {//类似链表头插法 acc.lastValue = v; acc.lastOrder = order; }
List<T> valueList = acc.orderToValueMap.get(order); if (valueList == null) { valueList = new ArrayList<>(); } valueList.add(v); acc.orderToValueMap.put(order, valueList); } }
@SuppressWarnings("unchecked") public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception { if (value != null) { T v = (T) value; List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestamp if (orderList != null && orderList.size() > 0) {// 说明之前已经发出过了.此刻该 retract Long order = orderList.get(0); orderList.remove(0);//最早进入的那个 value 对应的 timestamp remove if (orderList.isEmpty()) {//说明该 value 有且仅进入了一次 acc.valueToOrderMap.remove(v); } else { acc.valueToOrderMap.put(v, orderList); } retract(acc, value, order); } } }
@SuppressWarnings("unchecked") public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order) throws Exception { if (value != null) { T v = (T) value; List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 对应的所有 value if (valueList == null) { return; } int index = valueList.indexOf(v);// 找到对应的 value 并将其删除 if (index >= 0) { valueList.remove(index); if (valueList.isEmpty()) { acc.orderToValueMap.remove(order); } else { acc.orderToValueMap.put(order, valueList); } } if (v.equals(acc.lastValue)) { Long startKey = acc.lastOrder; Iterator<Long> iter = acc.orderToValueMap.keys().iterator(); // find the maximal order which is less than or equal to `startKey` //找到小于要删除值对应时间戳的最大值 Long nextKey = Long.MIN_VALUE; while (iter.hasNext()) { Long key = iter.next(); if (key <= startKey && key > nextKey) { nextKey = key; } }
if (nextKey != Long.MIN_VALUE) { List<T> values = acc.orderToValueMap.get(nextKey); acc.lastValue = values.get(values.size() - 1); acc.lastOrder = nextKey; } else { acc.lastValue = null; acc.lastOrder = null; } } } }
复制代码

首先呢是两个 MapView valueToOrderMap、orderToValueMap


valueToOrderMap 值( 此刻最终的结果 )---->消息进入 accumulate 方法的系统时间戳

orderToValueMap 消息进入 accumulate 方法的系统时间戳 ----->值( 此刻最终的结果 )


当 RowData( 内部使用 )对应的 rowKind 为 insert 或者 update_after 时,会进入 accumulate(LastValueWithRetractAccumulator<T> acc, Object value) 方法。accumulate 方法相对比较简单其实就是分别对 valueToOrderMap、orderToValueMap 进行赋值。


当 RowData( 内部使用 )对应的 rowKind 为 delete 或者 update_before 时,会进入 retract(LastValueWithRetractAccumulator<T> acc, Object value) 方法,主要是操作 valueToOrderMap 删除之前已经发出去的消息记录,然后进入 retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order),主要就是操作 orderToValueMap 删除对应时间戳的值,然后找出 不大于要删除数据对应时间戳的最大时间戳,下一步要 retract 就该它了


总结

其实就是通过 时间戳 来进行判断的


发布于: 2021 年 03 月 23 日阅读数: 11
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂 FlinkSQL函数 LAST_VALUE 的原理