大画 Spark :: 网络 (6)-Spark 网络中的“四次握手”Executor 注册到 Driver 的过程 (硬核)
回顾
在之前的文章中,介绍了 spark 的基础网络架构原理,引出了 Executor 和 Driver 的基础概念,并继续非常粗线条的勾勒了在 Executor 上运行的 Client 和 Driver 上运行的 Server。
本章主旨
本篇,我利用一个在 spark 中的经典的网络连接通信过程来把 Executor 和 Driver 作为 Client 与 Server 的角色串联一遍,在这过程中也会把之前几节出现的Endpoint
、EndpointRef
等在实际应用中梳理一遍,加深印象。即,我们要把这幅图的细节讲出来,这幅图的意思参考上一篇。
前置知识
前面讲过这个图,简单来说,Spark 通过 Driver 来调度和发送任务到每一个 Executor 上来分布式的执行 spark 代码,从而实现并行运算的过程。下图中的 task 是任务执行的基本单位,这些 task 会被以序列化的 bytebuffer 发送到 Executor 上,并进行启动执行。发送 task 的是 Driver,执行 task 的是 Executor。
而在一个运行在 Yarn 集群上的 Spark 集群初期启动的时候,Driver 并不是知道 Executor 到底启动在了哪个节点上,而是需要 Executor 首先向 Driver 来注册的,一旦注册成功,Driver 就持有了 Executor 的“基本信息”,通过这个“基本信息”就可以找到 Executor,并且发送任务给它,是的,这个“基本信息”就是前文中所讲的EndpointRef
,即 Driver 持有了 Executor 的Endpoint
的EndpointRef
,从而利用这个EndpointRef
联通Executor
,向它发送信息。
这里继续重复一下前文所讲的,Driver 是 Server,Executor 是 Client,上面的过程是 Driver 发送消息给 Executor,即 Server 发送消息给 Client,在 TCP 连接中这个没有任何问题,具体原因参考前文,不要陷入 http 的定式思维。
Executor 向 Driver 注册的全过程
Driver 意识到 Executor 的存在,是通过 Executor 向 Driver 的注册通知机制来实现的。本篇要介绍的“四次握手”这个概念就是在这个注册的过程。
这里需要说明的是,我所描述的四次握手只是我本人为了便于理解这个过程所定义的一个概念,并没有大规模的在技术圈内采用,请读者参考理解。
何为四次握手
前面的概念讲过,一个节点要发送消息给另一个节点,是需要通过目标节点的EndpointRef
来发送的,如下图
所谓握手,一定是伸出手,对方也返回手,这算两次握手。四次握手的话就是,伸出手,返回手,再次伸出手,再次返回手的 4 次过程。整个过程如下
第一次,需要 Executor 所代表的 client 端发送请求给 Driver 的 server
第二次,Driver 的 server 返回其
EndpointRef
给 Executor 代表的 client 严格来讲,这个过程其实并不是 server 返回的EndpointRef
,而是 client 端自己 new 出来的,只是通过这次请求确认了对方是否是 Driver 的 server,Drvier 的 Server 端其实仅仅回复了亿特true
,代表是否可以正确联通第三次,Executor 所代表的 client 利用第二次的
EndpointRef
注册自己到 Driver 的 server第四次,Driver 的 server 返回注册成功告知 Executor 所代表的 client
综上,这个过程一共是四次的网络请求
High level 的过程描述
先解释几个概念,在 Executor 端,也就是 client,其运行的是一个叫做CoarseGrainedExecutorBackend
的进程,这个进程本身也是一个Endpoint
,也可以接受并处理从 Driver 端发过来的请求,内部也可以发送请求给 Driver 端。
而在 Driver 端,我没有画出来实际运行的进程,其实在 yarn 的环境下,真正运行起来的是ApplicationMaster
的进程,Driver 的所有处理都是以线程单位在执行。为了简化难度,这里就不在细节上斟酌了,能看清楚过程即可,后续在讲 Driver 和 Executor 的细节时再做具体的说明。
过程
由 Executor 自己构建起一个 verifierRef,向 Driver 发送 request。这个 verifierRef 其实很简单,就是 ip、port 以及需要在 Driver 的哪个 Endpoint 进行处理的 Endpoint 的名字所组成的一个实体,即
verifierRef = ip port EndpointName
通过以上的 3 个信息中的 ip 和 port,可以找到网络中的 Driver,再通过 EndpointName 可以找到到底用哪一个 Endpoint 来对消息进行处理。
第一步的处理非常像 TCP 连接中三次握手的第一次试探性
SYN
的过程。在 Driver 侧,收到了这个 request 后,根据 EndpointName 最终找到了是
RpcEndpointVerifier
的 Endpoint 进行处理,这个过程在前面的章节中有讲解,可以参考这里。当 Driver 正确处理这个消息后,会给 Executor 的 client 端返回一个“true
”,告诉 client 端正确接收到了消息。第二步的这个过程非常像 TCP 连接中三次握手的从 server 返回给 client 端的
ACK
的过程。通过上面 2 的过程,Executor 端可以知道 Driver 可以联通,且通过一个 spark 内部的协议进行联通,并且获得了
true
的结果。此时可以进行最重要的步骤了,即注册 Executor 到 Driver,也就是 3 这个步骤。让 Driver 知道 Executor 的存在,Executor 会把自己的信息注册到 Driver 端,这样 Driver 端持有了 Executor 的信息,可以向 Executor 发出 Task 让其并行的去执行任务了。而 Driver 持有的到底是 Executor 的什么呢?在最开始的时候,我一直理解是 Executor 的 ip 和 port,这样可以联通 Executor,但是后来想象也不可能,每次都要重新建立 connection 么?看了源代码后发现,其实通过 3 这个步骤,Driver 持有了与 Executor 之间的 channel,这个 channel 其实就是 socket 的上层抽象,后续再细说这块。通过 3,Executor 又向 Driver 注册了自己,Driver 持有了联通二者的 channel,那么最后一步就是告诉 Executor“你已经在我这里注册成功了”,再针对 3 做一个闭环的 response 返回。所以可以看到,在 3 个过程中是进行 RegisterExecutor,而 4 中进行的是 RegisteredExecutor 的告知。
通过上述的描述,可以基础的勾勒出整个 Executor 向 Driver 注册的全过程,和三次握手有着很类似的过程。下面把上述的四个过程细化一下。
第一次(Executor 通过 URI 访问 Driver)
Executor 端
全景图如上,从方块的数字开始讲述
整个 CoarseGrainedExecutorBackend 的启动会调用其 run 的方法,会进行 SparkEnv 的 create 操作
因为是 Executor,所以进行 createExecutorEnv 的方法调用
进而进行 create 操作,通过
NettyRpcEnvFactory
构建出来 Rpc 的环境,即 NettyRpcEnv,它是发送 request 到 Drvier 的重要组件NettyRpcEnv 构建起来
构建出在 Executor 端的 Endpoint,用来接收从 Driver 端发出的请求,做出响应
将这个 Endpoint,即 CoarseGrainedExecutorBackend 存储到 endpoints 中去
在
CoarseGrainedExecutorBackend
中又一个onStart
的方法,这个方法可以在第一次初始化的时候被触发,一旦触发则会开始进行上述的四次握手的“1“的过程这个过程是在
CoarseGrainedExecutorBackend
的onStart
方法中开始调用调用的是
NettyRpcEnv
的asyncSetupEndpointRefByURI
方法,这个方法顾名思义,就是也要通过 URI 在本地去构建起一个 EndpointRef。这句话怎么理解呢?即首次访问的时候,是通过一个 ip+port+endpointName 构建成一个 request 访问 Driver,这个访问是否 OK 就是第二次的过程,即 Driver 返回给 Executor 的反馈是不是true
访问 Driver 的时候,Executor 会构建出
TransportClient
,通过其访问 Driver,并且生成一个唯一的requestId
,不要小看了这个requestId
,它是识别从 Driver 返回消息的唯一 ID,即从 Drvier 返回的消息到底是之前哪个发送消息的 response,是通过这个 requestId 去判断的,而这个消息本身会通过requestId
作为 key,callback 函数作为 value 存放在一个 map 中,参看下图,是通过一个addRpcRequest
方法加入到这个outstandingRpcs
的 map 中的最后,消息通过 channel 发送到 Driver
第二次(Driver 收到访问,返回 true)
分成两个图了,太大
之前的文章说过,请求到达后先需要通过 TransportChannelHandler 判断是 request 还是 response
因为是 request,则会通过
TransportReuqestHandler
进行后续的处理调用起
NettyRpcHandler
进入到
Dispatcher
中,这里开始和前面的文章讲述的如何接收数据部分就重合了,参考这里。就不细展开了到处理完请求后,需要进行返回,因为请求的 message 类型不是
OnewayMessage
而是RpcRequest
,所以是需要 response 回 Executor 的,这块是通过一个RemoteNettyRpcCallContext
中的 callback 调用返回的通过
RpcResponseCallback
的onSuccess
中的respond
的调用把true
返回给 Executor返回
true
当true
返回给 Executor 的时候,还记得上面说到的那个outstandingRpcs
的 map 么,通过 requestId 再找到之前存好的 callback 函数,调用 callback 函数从而实现了一次同步的(类似于模拟 http 协议)的访问过程。
第三次(构建 driver 的 EndpointRef 进行 Executor 注册)
第四次(Driver 返回注册 Executor 成功的信息给 Executor)
上述的第三次和第四次,其实本质上和第一次、第二次类似,只是,上面的第一次,是利用 URI 访问的 Driver,而从第三次开始,因为第二次返回的 true,让 Executor 可以构建好访问 Driver 的 EndpointRef 了,后续的访问直接使用这个EndpointRef
即可。
这块还是稍稍看看代码吧,就不画图了。
进入到方法asyncSetupEndpointRefByURI
的内部
再回到 onstart 方法中
而第四次,从 Driver 发送 RegisteredExecutor 的操作,是从 Driver 端主动发出的,代码如下
总结
这部分整个串通下来还是有点烧脑的,比起后面的 shuffle 和 stage 等操作来说,还是属于及其小儿科的部分。主要是对网络的熟悉程度决定着这部分的理解。
本篇,简单介绍了 Executor 注册到 Driver 的四次握手的过程,让 Executor 和 Driver 互相联通上。但是我们侧重说的是在 Executor 端如何建立连接的过程,但是在 Driver 端,是如何收取到 Executor 的 request 后,也自己去构建起一个 EndpointRef,从而实现可以双工的给 Executor 提交 Task 的呢?这块算是 Spark 中比较 hacky 的部分了,下篇细聊。
版权声明: 本文为 InfoQ 作者【dclar】的原创文章。
原文链接:【http://xie.infoq.cn/article/e986de0424dd281d5245d0afd】。文章转载请联系作者。
评论