写点什么

大画 Spark :: 网络 (8)-Spark 网络中的“四次握手”Driver 如何获取 Executor 的 EndpointRef 烧脑

作者:dclar
  • 2022 年 10 月 01 日
    北京
  • 本文字数:2849 字

    阅读完需:约 9 分钟

大画 Spark :: 网络 (8)-Spark 网络中的“四次握手”Driver 如何获取 Executor 的 EndpointRef 烧脑

回顾

上一篇,详细介绍了在 client 端与 server 端都存在的TransportClient,并且从 client 端和 server 端都分析了发送 request 的简单过程。

本章主旨

spark 在计算过程中,需要 2 个角色,一个是 driver,一个是 executor。基于 yarn 的 spark 在运行过程中,到底 executor 运行在哪个节点上,是由 yarn 去分配并启动的。


分配业务的计算任务给相应的 executor 是 driver 的指责,所以运行到节点的 executor 需要告知 driver 其在哪里运行,方便 driver 不断给 executor 下发任务。


上面这个过程就是 spark 中大名鼎鼎的反向注册,前几篇也说过这个话题,此篇老生常谈拿出这个概念来开个篇,后续说到整个过程的时候还会拿出来细化。这个过程在很多课程中都被赋予了神秘且神奇的面纱,但你可以仔细想想,如果没有这个过程,driver 怎么知道到底去调度运行哪一个 executor 呢?这也是一个不得已而为之的操作罢了。


有心的同学可能会问到,executor 是怎么知道 driver 的地址的呢?这个过程其实也非常 hacky,具体到 driver 和 executor 的章节中我们在细细讨论,这里你可以直接理解为,在 yarn 调度启动 executor 的时候,其启动的命令行中都带有了 driver 的地址信息,从而让 executor 可以反向注册到 driver 中。


之前的文章,我们说到了几次握手的过程,本篇讲述的是第一次握手时,excutor 注册到 driver 后,driver 是如何获取到了 executor 的信息从而建立起与其长期的连接的过程。

回忆 2 个发送过程

Executor 发送需要 reply 的消息给 Driver

这个过程即 Executor→Driver→Executor 的过程。其中,Executor 发送消息 reply 给 Driver 的过程,实际上是获取了 Executor 的EndpointRef,并通过TransportClient持有的 Channel(fd)从而把消息发送回 Executor。



Driver 主动发送消息给 Executor

前文无数次的强调过,与 Http 协议不同,在 TCP 的网络层面,Server 和 Client 在连接有效的时段内,是可以随意给对方发送消息的。


归结到下图,Driver(代表 Server)也是可以给 Executor(Client)随时随地发送消息的,而发出消息的始作俑者就也是上文提到的 Executor 的EndpointRef



EndpointRef 是如何发出消息的

通过上文的描述,EndpointRef发出消息是通过TransportClient持有的 Channel 发出消息,以下主要探讨 EndpointRef 是如何产生出来的,并且,它又是如何持有了在 Driver 中存在的 TransportClient 的过程

Executor 发出的 ByteBuffer

Executor 发出消息的过程,就是将消息包装成为一个ByteBuffer的过程,这个ByteBuffer包括以下内容


  • RpcAddress:代表 Executor 的地址信息(ip 和 port),通过此属性可以直接 link 到 Executor

  • NettyRpcEndpointRef:代表着 Executor 的EndpointRef,也就是 Driver 端去连接 Executor 的起始处

  • content:具体发送的内容



上述的信息通过 serialize 方法打包成ByteBuffer,通过网络发送给 Driver

Driver 接收 ByteBuffer

从 Driver 接收ByteBuffer,并且,把在 Driver 中已经生成好的TransportClient作为参数向后传递



ByteBuffer 的解析

这是本章最关键的部分。


ByteBuffer是通过internalReceive方法进行的解析,这个解析的过程也就是deserialize的过程。


ByteBuffer第一次解析,会还原成为RequestMessage,并还原出原来在 Executor 中的


  • RpcAddress


这里有一个非常容易干扰读者的地方,即,在 RequestMessage 中,会 new 出一个NettyRpcEndpointRef,并且会把TransportClient放入其中,然而,这个NettyRpcEndpointRef并非后续从 Driver 发送消息给 Executor 的 Ref 主体。


真正发送消息的主体是通过ByteBuffer调用nettyEnv.deserialize来进行反序列化的过程中,从 content 中反序列化回来的NettyRpcEndpointRef,但是问题是,在反序列化的过程,是如何把外部的TransportClient放入其中的呢?



这里就要考察基础知识了,需要用一段代码来解释


private[netty] class NettyRpcEndpointRef(    @transient private val conf: SparkConf,    private val endpointAddress: RpcEndpointAddress,    @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
@transient @volatile var client: TransportClient = _
override def address: RpcAddress = if (endpointAddress.rpcAddress != null) endpointAddress.rpcAddress else null
/** * 这部分是重要的代码,反序列化的过程中,会调用readObject方法,通过这个方法的AOP(姑且这么形容) * 可以通过NettyRpcEnv.currentClient.value 把外部的TransportClient 植入到 * NettyRpcEndpoint当中 */ private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() nettyEnv = NettyRpcEnv.currentEnv.value client = NettyRpcEnv.currentClient.value }
private def writeObject(out: ObjectOutputStream): Unit = { out.defaultWriteObject() }
override def name: String = endpointAddress.name
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout) }
override def send(message: Any): Unit = { require(message != null, "Message is null") nettyEnv.send(new RequestMessage(nettyEnv.address, this, message)) }
override def toString: String = s"NettyRpcEndpointRef(${endpointAddress})"
final override def equals(that: Any): Boolean = that match { case other: NettyRpcEndpointRef => endpointAddress == other.endpointAddress case _ => false }
final override def hashCode(): Int = if (endpointAddress == null) 0 else endpointAddress.hashCode()}
复制代码


通过上述代码可以看出,TransportClient是通过反序列化植入到了NettyRpcEndpointRef中,为后续通过其发送消息给Executor奠定基础

通过 Dispatcher 到达 DriverEndpoint

通过前面的网络部分的描述,,最终会在 DriverEndpoint 这样一个 Endpoint 中通过 case class 的形式解析出消息体本身,消息体解析出来后,本例是RegisterExecutor的一个 case class,其中包含了被反序列化后的NettyRpcEndpointRef,而这个NettyRpcEndpointRef当中又包含着被植入的 Driver 端的TransportClient,而消息最终可以通过NettyRpcEndpointRefTransportClient来发送成功



整个过程的全局图

通过这个全局图,可以看清消息是如何从 Executor 发送到的 Driver 并且如何一步步的解析,最终把相应的结构都准备好,形成可以向 Executor 发送消息的NettyRpcEndpointRef的。



总结

至此,Spark 的网络篇告一段落。通过 8 篇文章的介绍,基本上讲清了 spark 中 Executor 与 Driver 之间网络通信的基础原理,并且还有一些 Netty 的小知识贯穿其中。


掌握了 spark 的网络知识,为后续了解其全貌奠定了一个非常坚实的基础,对理解分布式的执行过程有着重要的意义。

发布于: 2022 年 10 月 01 日阅读数: 7
用户头像

dclar

关注

有技术 有智慧 有胸怀 有眼界 2020.05.29 加入

I am an Artist

评论

发布
暂无评论
大画 Spark :: 网络 (8)-Spark 网络中的“四次握手”Driver 如何获取 Executor 的 EndpointRef 烧脑_大数据_dclar_InfoQ写作社区