Flink 源码分析之一文搞懂 Flink 消息全流程
我们以下面代码为例:
当 Flink 程序启动,leader、blobServer 等都创建完毕,当 ExecutionGraph 构建完成,提交成功之后。就到了,task 正式执行的阶段了。这个时候,一条消息是如何流转的呢?
首先,进入了 Task 的 run 方法
然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方
最为关键的就是 run 方法。
进入 SourceStreamTask run 方法
继续追踪就到了 StreamSource 的 run 方法
此处的 userFunction 实际上就是 FlinkKafkaConsumer
具体是如何消费消息的可以参考
彻底搞懂 Flink Kafka OffsetState 存储
继续追踪到 RecordWriter
RecordWriter 还是比较有意思的,RecordWriter 主要就是把 java 对象转化为 byte 数组( 也就是 flink 自己管理内存,不借助与 JVM )。而后面的传输也是基于 byte 数组的。
copyFromSerializerToTargetChannel 会将 byte 数据 flush 到 相应的 targetChannel ( targetChannel 对于下游来说就是 InputChannel 具体可以参考一下 Flink反压机制 )
底层通过 netty 进行数据的传送,传送至 PartitionRequestQueue
这个时候,这条数据就进入了下游的 InputChannel 。
有写得需要有读,进入到 CreditBasedPartitionRequestClientHandler
至此呢,就该下游算子 flapMap 运行处理了。(当然啦,实际上应该是先 print 对应的 task 运行,然后 flatMap 对应的 task 运行,最后才是 source 对应的 task 运行 )。
我们得回到 Task 的 run 方法
然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方
最为关键的就是 run 方法。
这次调用的是 flatMap 对应 task 的 run 方法,所以进入 OneInputStreamTask
进入 processInput 方法
获取到 buffer 之后
交给 flatMap 去处理。处理完了之后就又把数据发往 RecordWriter 的 emit 然后就这样反复执行,直到最后一个 operator ,这个消息也就消费完毕了。当然了,这仅仅是跨 taskManager 的消息流程,同一个 taskMananger 的消息流程就很简单了,就是简单的消息传递,不需要序列化成 byte 数组
总结
整体流程
1. 第一步必然是准备一个ResultPartition;
通知JobMaster;
JobMaster通知下游节点;如果下游节点尚未部署,则部署之;
下游节点向上游请求数据
开始传输数据
数据跨 task 传输
数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。
每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer
接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment)
单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入
对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel
有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/53a5b0118fafaa110849d50fa】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论