写点什么

大画 Spark :: 网络 (6)-Spark 网络中的“四次握手”Executor 注册到 Driver 的过程 (硬核)

作者:dclar
  • 2022 年 3 月 22 日
  • 本文字数:6270 字

    阅读完需:约 21 分钟

大画 Spark :: 网络(6)-Spark网络中的“四次握手”Executor注册到Driver的过程(硬核)

回顾

在之前的文章中,介绍了 spark 的基础网络架构原理,引出了 Executor 和 Driver 的基础概念,并继续非常粗线条的勾勒了在 Executor 上运行的 Client 和 Driver 上运行的 Server。

本章主旨

本篇,我利用一个在 spark 中的经典的网络连接通信过程来把 Executor 和 Driver 作为 Client 与 Server 的角色串联一遍,在这过程中也会把之前几节出现的EndpointEndpointRef等在实际应用中梳理一遍,加深印象。即,我们要把这幅图的细节讲出来,这幅图的意思参考上一篇


前置知识

前面讲过这个图,简单来说,Spark 通过 Driver 来调度和发送任务到每一个 Executor 上来分布式的执行 spark 代码,从而实现并行运算的过程。下图中的 task 是任务执行的基本单位,这些 task 会被以序列化的 bytebuffer 发送到 Executor 上,并进行启动执行。发送 task 的是 Driver,执行 task 的是 Executor。

而在一个运行在 Yarn 集群上的 Spark 集群初期启动的时候,Driver 并不是知道 Executor 到底启动在了哪个节点上,而是需要 Executor 首先向 Driver 来注册的,一旦注册成功,Driver 就持有了 Executor 的“基本信息”,通过这个“基本信息”就可以找到 Executor,并且发送任务给它,是的,这个“基本信息”就是前文中所讲的EndpointRef,即 Driver 持有了 Executor 的EndpointEndpointRef,从而利用这个EndpointRef联通Executor,向它发送信息。

这里继续重复一下前文所讲的,Driver 是 Server,Executor 是 Client,上面的过程是 Driver 发送消息给 Executor,即 Server 发送消息给 Client,在 TCP 连接中这个没有任何问题,具体原因参考前文,不要陷入 http 的定式思维。



Executor 向 Driver 注册的全过程

Driver 意识到 Executor 的存在,是通过 Executor 向 Driver 的注册通知机制来实现的。本篇要介绍的“四次握手”这个概念就是在这个注册的过程。

这里需要说明的是,我所描述的四次握手只是我本人为了便于理解这个过程所定义的一个概念,并没有大规模的在技术圈内采用,请读者参考理解。

何为四次握手

前面的概念讲过,一个节点要发送消息给另一个节点,是需要通过目标节点的EndpointRef来发送的,如下图


所谓握手,一定是伸出手,对方也返回手,这算两次握手。四次握手的话就是,伸出手,返回手,再次伸出手,再次返回手的 4 次过程。整个过程如下

  • 第一次,需要 Executor 所代表的 client 端发送请求给 Driver 的 server

  • 第二次,Driver 的 server 返回其EndpointRef给 Executor 代表的 client 严格来讲,这个过程其实并不是 server 返回的EndpointRef,而是 client 端自己 new 出来的,只是通过这次请求确认了对方是否是 Driver 的 server,Drvier 的 Server 端其实仅仅回复了亿特true,代表是否可以正确联通

  • 第三次,Executor 所代表的 client 利用第二次的EndpointRef注册自己到 Driver 的 server

  • 第四次,Driver 的 server 返回注册成功告知 Executor 所代表的 client

综上,这个过程一共是四次的网络请求

High level 的过程描述


先解释几个概念,在 Executor 端,也就是 client,其运行的是一个叫做CoarseGrainedExecutorBackend的进程,这个进程本身也是一个Endpoint,也可以接受并处理从 Driver 端发过来的请求,内部也可以发送请求给 Driver 端。

而在 Driver 端,我没有画出来实际运行的进程,其实在 yarn 的环境下,真正运行起来的是ApplicationMaster的进程,Driver 的所有处理都是以线程单位在执行。为了简化难度,这里就不在细节上斟酌了,能看清楚过程即可,后续在讲 Driver 和 Executor 的细节时再做具体的说明。

过程

  1. 由 Executor 自己构建起一个 verifierRef,向 Driver 发送 request。这个 verifierRef 其实很简单,就是 ip、port 以及需要在 Driver 的哪个 Endpoint 进行处理的 Endpoint 的名字所组成的一个实体,即

    verifierRef = ip port EndpointName

    通过以上的 3 个信息中的 ip 和 port,可以找到网络中的 Driver,再通过 EndpointName 可以找到到底用哪一个 Endpoint 来对消息进行处理。

    第一步的处理非常像 TCP 连接中三次握手的第一次试探性SYN的过程。

  2. 在 Driver 侧,收到了这个 request 后,根据 EndpointName 最终找到了是RpcEndpointVerifier的 Endpoint 进行处理,这个过程在前面的章节中有讲解,可以参考这里。当 Driver 正确处理这个消息后,会给 Executor 的 client 端返回一个“true”,告诉 client 端正确接收到了消息。

    第二步的这个过程非常像 TCP 连接中三次握手的从 server 返回给 client 端的ACK的过程。

  3. 通过上面 2 的过程,Executor 端可以知道 Driver 可以联通,且通过一个 spark 内部的协议进行联通,并且获得了true的结果。此时可以进行最重要的步骤了,即注册 Executor 到 Driver,也就是 3 这个步骤。让 Driver 知道 Executor 的存在,Executor 会把自己的信息注册到 Driver 端,这样 Driver 端持有了 Executor 的信息,可以向 Executor 发出 Task 让其并行的去执行任务了。而 Driver 持有的到底是 Executor 的什么呢?在最开始的时候,我一直理解是 Executor 的 ip 和 port,这样可以联通 Executor,但是后来想象也不可能,每次都要重新建立 connection 么?看了源代码后发现,其实通过 3 这个步骤,Driver 持有了与 Executor 之间的 channel,这个 channel 其实就是 socket 的上层抽象,后续再细说这块。

  4. 通过 3,Executor 又向 Driver 注册了自己,Driver 持有了联通二者的 channel,那么最后一步就是告诉 Executor“你已经在我这里注册成功了”,再针对 3 做一个闭环的 response 返回。所以可以看到,在 3 个过程中是进行 RegisterExecutor,而 4 中进行的是 RegisteredExecutor 的告知。

通过上述的描述,可以基础的勾勒出整个 Executor 向 Driver 注册的全过程,和三次握手有着很类似的过程。下面把上述的四个过程细化一下。

第一次(Executor 通过 URI 访问 Driver)


Executor 端

全景图如上,从方块的数字开始讲述

  1. 整个 CoarseGrainedExecutorBackend 的启动会调用其 run 的方法,会进行 SparkEnv 的 create 操作

  2. 因为是 Executor,所以进行 createExecutorEnv 的方法调用

  3. 进而进行 create 操作,通过NettyRpcEnvFactory构建出来 Rpc 的环境,即 NettyRpcEnv,它是发送 request 到 Drvier 的重要组件

  4. NettyRpcEnv 构建起来

  5. 构建出在 Executor 端的 Endpoint,用来接收从 Driver 端发出的请求,做出响应

  6. 将这个 Endpoint,即 CoarseGrainedExecutorBackend 存储到 endpoints 中去

  7. CoarseGrainedExecutorBackend中又一个onStart的方法,这个方法可以在第一次初始化的时候被触发,一旦触发则会开始进行上述的四次握手的“1“的过程

    这个过程是在CoarseGrainedExecutorBackendonStart方法中开始调用

    调用的是NettyRpcEnvasyncSetupEndpointRefByURI方法,这个方法顾名思义,就是也要通过 URI 在本地去构建起一个 EndpointRef。这句话怎么理解呢?即首次访问的时候,是通过一个 ip+port+endpointName 构建成一个 request 访问 Driver,这个访问是否 OK 就是第二次的过程,即 Driver 返回给 Executor 的反馈是不是true

  8. 访问 Driver 的时候,Executor 会构建出TransportClient,通过其访问 Driver,并且生成一个唯一的requestId,不要小看了这个requestId,它是识别从 Driver 返回消息的唯一 ID,即从 Drvier 返回的消息到底是之前哪个发送消息的 response,是通过这个 requestId 去判断的,而这个消息本身会通过requestId作为 key,callback 函数作为 value 存放在一个 map 中,参看下图,是通过一个addRpcRequest方法加入到这个outstandingRpcs的 map 中的

  9. 最后,消息通过 channel 发送到 Driver


第二次(Driver 收到访问,返回 true)



分成两个图了,太大

  1. 之前的文章说过,请求到达后先需要通过 TransportChannelHandler 判断是 request 还是 response

  2. 因为是 request,则会通过TransportReuqestHandler进行后续的处理

  3. 调用起NettyRpcHandler

  4. 进入到Dispatcher中,这里开始和前面的文章讲述的如何接收数据部分就重合了,参考这里。就不细展开了

  5. 到处理完请求后,需要进行返回,因为请求的 message 类型不是OnewayMessage而是RpcRequest,所以是需要 response 回 Executor 的,这块是通过一个RemoteNettyRpcCallContext中的 callback 调用返回的

  6. 通过RpcResponseCallbackonSuccess中的respond的调用把true返回给 Executor

  7. 返回true

true返回给 Executor 的时候,还记得上面说到的那个outstandingRpcs的 map 么,通过 requestId 再找到之前存好的 callback 函数,调用 callback 函数从而实现了一次同步的(类似于模拟 http 协议)的访问过程。


第三次(构建 driver 的 EndpointRef 进行 Executor 注册)

第四次(Driver 返回注册 Executor 成功的信息给 Executor)

上述的第三次和第四次,其实本质上和第一次、第二次类似,只是,上面的第一次,是利用 URI 访问的 Driver,而从第三次开始,因为第二次返回的 true,让 Executor 可以构建好访问 Driver 的 EndpointRef 了,后续的访问直接使用这个EndpointRef即可。

这块还是稍稍看看代码吧,就不画图了。

override def onStart() {    logInfo("Connecting to driver: " + driverUrl)
// 这里是开始访问的“第一次” rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) }C
复制代码

进入到方法asyncSetupEndpointRefByURI的内部

def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {    val addr = RpcEndpointAddress(uri)		// 构建了drvier的endpointRef    val endpointRef = new NettyRpcEndpointRef(conf, addr, this)		// 构建了drvier的verifier的endpointRef    val verifier = new NettyRpcEndpointRef(      conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)    // 其实 “第一次”的访问,是通过这个verifier的endpointRef进行的    // 在spark中,endpointRef的ask是需要response的,也就是ask的是RpcRequest,而send方法对应的是OneWayMessage    verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>      if (find) { // 这里就是Driver返回的true        Future.successful(endpointRef) // 这里的endpointRef其实并不是Driver返回的,而是在Executor构建的        // 后续利用这个endpointRef去直接访问Driver即可      } else {        Future.failed(new RpcEndpointNotFoundException(uri))      }    }(ThreadUtils.sameThread)  }
复制代码

再回到 onstart 方法中

override def onStart() {    logInfo("Connecting to driver: " + driverUrl)
// 这里是开始访问的“第一次” rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" // ref就是上面的endpointRef,也就是访问Drvier的ref driver = Some(ref) // 这里再通过这个ref 再次ask发送RegisterExecutor的请求 ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // 这块没有返回值 // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) }
复制代码

而第四次,从 Driver 发送 RegisteredExecutor 的操作,是从 Driver 端主动发出的,代码如下

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>        if (executorDataMap.contains(executorId)) {          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))          context.reply(true)        } else if (scheduler.nodeBlacklist.contains(hostname)) {          // If the cluster manager gives us an executor on a blacklisted node (because it          // already started allocating those resources before we informed it of our blacklist,          // or if it ignored our blacklist), then we reject that executor immediately.          logInfo(s"Rejecting $executorId as it has been blacklisted.")          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))          context.reply(true)        } else {          // If the executor's rpc env is not listening for incoming connections, `hostPort`          // will be null, and the client connection should be used to contact the executor.          val executorAddress = if (executorRef.address != null) {              executorRef.address            } else {              context.senderAddress            }          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")          addressToExecutorId(executorAddress) = executorId          totalCoreCount.addAndGet(cores)          totalRegisteredExecutors.addAndGet(1)          val data = new ExecutorData(executorRef, executorAddress, hostname,            cores, cores, logUrls)          // This must be synchronized because variables mutated          // in this block are read when requesting executors          CoarseGrainedSchedulerBackend.this.synchronized {            executorDataMap.put(executorId, data)            if (currentExecutorIdCounter < executorId.toInt) {              currentExecutorIdCounter = executorId.toInt            }            if (numPendingExecutors > 0) {              numPendingExecutors -= 1              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")            }          }					// 这里是主动发送RegisteredExecutor给Executor的操作          executorRef.send(RegisteredExecutor)          // Note: some tests expect the reply to come after we put the executor in the map          context.reply(true)          listenerBus.post(            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))          makeOffers()        }
复制代码

总结

这部分整个串通下来还是有点烧脑的,比起后面的 shuffle 和 stage 等操作来说,还是属于及其小儿科的部分。主要是对网络的熟悉程度决定着这部分的理解。

本篇,简单介绍了 Executor 注册到 Driver 的四次握手的过程,让 Executor 和 Driver 互相联通上。但是我们侧重说的是在 Executor 端如何建立连接的过程,但是在 Driver 端,是如何收取到 Executor 的 request 后,也自己去构建起一个 EndpointRef,从而实现可以双工的给 Executor 提交 Task 的呢?这块算是 Spark 中比较 hacky 的部分了,下篇细聊。

发布于: 2022 年 03 月 22 日阅读数: 63
用户头像

dclar

关注

有技术 有智慧 有胸怀 有眼界 2020.05.29 加入

I am an Artist

评论

发布
暂无评论
大画 Spark :: 网络(6)-Spark网络中的“四次握手”Executor注册到Driver的过程(硬核)_大数据_dclar_InfoQ写作社区