写点什么

Flink 处理函数实战之一:深入了解 ProcessFunction 的状态 (Flink-1

作者:Java高工P7
  • 2021 年 11 月 09 日
  • 本文字数:2332 字

    阅读完需:约 8 分钟

  1. 如上图,keyed stream 的元素是具有 key 的特征,与 ProcessFunction 的操作状态时要求匹配,其他 steam 的元素由于没有 key 的特征,所以也就没有状态一说了;

  2. 另一种状态是 Operator State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费 kafka 的某个分区的最新 offset,而 ProcessFunction 是用来处理 stream 元素的,不会涉及到 Operator State:


官方 demo

为了学习 ProcessFunction 就去看官方 demo,地址是:https://ci.apache.org/projects/flink/fl


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


ink-docs-release-1.10/dev/stream/operators/process_function.html ,简单说说这个 demo 的功能:


  1. 数据源在不间断的产生单词,每个单词对应一个 Tuple2<String,String>的实例;

  2. 数据源被 keyBy 方法转成 KeyedStream,key 是 Tuple2 实例的 f0 字段;

  3. 一个 KeyedProcessFunction 的子类 CountWithTimeoutFunction,被用来处理 KeyedStream 的每个元素,处理的逻辑:为每个 key 维护一个状态,状态的内容是这个 key 的出现次数和最后一次出现时间;

  4. 如果那个 key 连续一分钟没有出现,KeyedProcessFunction 就向下游发送这个元素;


以上就是官方 demo 的功能,本来是想通过 demo 来加深认识,结果看完不但没有明白,反而更晕了,下图是我对 demo 代码的疑惑:



从上图可见我的疑惑,这里再复述一下:


5. 入参 value 是 Tuple2 类型,假设其 f0 字段等于 aaa,那么 processElement 方法的作用,就是取出 aaa 的状态,更新后保存;


6. 从代码上看,state.value()返回了 aaa 的状态,这个 value 方法并没有将 aaa 作为入参,那怎么做到返回 aaa 的状态呢?如果下一个入参 value 的 f0 字段等于 bbb 了,这个 state.value()能返回 bbb 的状态吗?


7. 对更新状态的代码 state.update(current)也是同样的疑惑;


8. 然后又产生了新的疑惑:成员变量 state 难道是一直在变?每执行一次 processElement,都会变成该 key 对应的 state 实例?

先反思为何会有上述疑惑

  1. 上述疑惑产生的原因,应该是受到平时使用 HashMap 的影响,HashMap 获取值就是在调用 get 方法时指定 key,设置值也是在 put 时指定 key,所以看到 state.value()方法没有用 key 做入参就不习惯了

  2. 要消除这种不适应,要做的第一件事就是提醒自己:processElement 是在框架内运行的,很多数据在之前已经由框架准备好了;

  3. 接下来要做的,就是把框架准备数据的逻辑看一遍,除了弄明白自己的问题,由于 ProcessFunction 属于最低阶抽象(如下图的最下方位置),看懂了这些,其实也是在了解 DataStream/DataSet API 的设计思路:


跟踪源码

  1. 如下图,让我们从一个断点的堆栈开始吧,这是在执行上面 demo 中的 processElement 方法之前的一个断点,可见根源是个线程的 run 方法,也就是 KeyedProcessFunction 对应的算子执行任务的线程:



  1. 上面的堆栈不必每一层都细看,只关注重要的部分,下图这段很重要:StreamTask.run 方法中,有个无限循环(猜测是每次执行 processInput 方法都处理 KeyedStream 的一个元素):



  1. 如下图,StreamOneInputProcessor.processInput 方法取出 KeyedStream 的一个元素,调用 processElement 方法,并将此元素作为入参,再结合上一幅图可以看出:在编写 KeyedProcessFunction 子类的时候,KeyedStream 的每个元素都会作为入参,在调用你重写的 processElement 方法时传进去;这一点,在做 ProcessFunction 和 KeyedProcessFunction 开发时都是要格外注意的:



  1. 接下来到了最关键的地方了,下图红框中的 streamOperator.setKeyContextElement1(record)会解答我前面的疑惑,一定要进去看个清楚,(后面的黄线上的代码,您应该猜到了,里面其实就是调用 demo 中的 processElement 方法)



  1. 下图中,AbstractStreamOperator.setKeyContextElement 给出了答案:对于 KeyedStream 的每个元素,都会在这里算出 key,再调用 setCurrentKey 保存这个 key:



  1. 展开 setCurrentKey,如下图,发现 key 的保存和当前状态的存储策略(StateBackend)有关,我这里是默认策略 HeapKeyedStateBackend:



  1. 最终,根据当前元素得到的 key 会在 StateBackend 的 keyContext 对象中找地方保存,StateBackend 的具体实现和 Flink 设置有关,我这里是保存到了 InternalKeyContextImpl 实例的 currentKey 变量中:



  1. 代码读到这里,对我前面的疑惑,您应该能推测出答案了:state.value()里面会通过 StateBackend 的 keyContext 取出刚才保存的 key,接下来就能像 HashMap 那样根据 key 查出该 key 的状态了,接下来是愉快的印证我们推测的过程;

  2. 在 state.value()代码位置打断点一次看个明白,如下图,果然,state 里面有 StateBackend 的 keyContext 对象的引用,访问刚才保存的 key 就不成问题了:



  1. 展开 state.value()方法如下,简单明了,直接拿 keyContext 保存的 key 作为入参去取对应的状态:



  1. 再展开上面的 get 方法,可见最终是从 stateMap 中取得的,而这个 stateMap 的具体实现是 CopyOnWriteStateMap 类型的实例:



  1. 代码读到这里,只剩最后一处需要印证了:更新状态的 state.update(current)方法,应该也是以 StateBackend 的 keyContext 中的 key 作为自己的 key,再将入参的 current 作为 value,更新到 stateMap 中,来吧,一起印证这个推测;

  2. 展开方法,看到的是 stateTable.put 方法(前面刚看过 stateTable 的 get 方法,稳了):



  1. stateTable.put 方法里面和前面的 get 方法一样,直接拿 keyContext 保存的 key 作为自己的 key:



  1. 最终是调用了 stateMap.put 方法,将数据保存在 CopyOnWriteStateMap 实例中:



  1. 得益于 Flink 代码自身规范、清晰的设计和实现,再加上 IDEA 强大的 debug 功能,整个阅读和分析过程十分顺利,这其中的收获会逐渐在今后深入学习 DataStreamAPI 的过程中见效;


最后,根据上面的分析过程绘制了一幅简陋的流程图,希望能帮助您加快理解:


欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游 Java 世界…

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1