写点什么

大画 Spark :: 网络 (7)-Spark 网络中的“四次握手”Executor 注册到 Driver 过程中的 TransportClient 与细节过程

作者:dclar
  • 2022 年 5 月 19 日
  • 本文字数:3612 字

    阅读完需:约 12 分钟

大画 Spark :: 网络(7)-Spark网络中的“四次握手”Executor注册到Driver过程中的TransportClient与细节过程

回顾

上一篇把 Executor 注册到 Driver 的过程进行了详尽的描述。并且把四次往复的过程用图和代码都做了说明,虽然后面的注册 Executor 的部分没有详细再画图,但是起过程和第一次确认 Driver 端服务的过程大体相同,如有问题可以给我留言我们来互动沟通。

本章主旨

上述过程中,在 Executor 的 client 端是如何构建了 socket,如何发送的请求,这部分细节是本章要探讨的主要内容。


这部分内容,其实是我第一篇 Spark 的源代码文章中就讲过的,但是当时讲的方法有点啰嗦,很多同学看完告诉我还是太硬核了,参考这里。当时还手工的画了一个下图。



所以我考虑再三后,重新把 spark 的内容进行了梳理后,写的现在的大画 spark 系列。本次再讲解的话,主要还是注重画图,减少大幅贴代码。

从 Executor 发送请求的流程

这个过程中,有两个地方相对优雅一些


  • client 端的懒加载(初始化)

  • 所谓懒加载,即 client 不是在 Executor 端启动后就直接初始化的,而是在真正的使用过程中进行的初始化,而整个从 Executor→Driver 的发送消息的过程中,在 Executor 端也是一个生产者消费者的过程,即消息会放入一个 queue,由相应的send处理来poll出然后进行发送处理。在这个过程中,第一次连接服务端,则会初始化 client,即懒加载,并且,初始化 client 的过程也是一个异步的过程。会有一个专门的线程服务来进行 client 的初始化,当初始化结束后,会持续之前的发送流程,直到整个发送结束。

  • client 的 pool 化策略

  • pool 化,即池化的策略。当前的 Executor 对于 Driver 来讲是一个 client,在 spark 的文件存储管理等角度,它还会作为逻辑上的“client”存在,所以要对于一个 Executor 可能会连接不同的多个 Server,对于每一个 Server 都会用一个连接池来保存这些 connections,即,对于每一个目标 server 都有存在一组连接池,每次使用的时候从其中随机拿出一个连接即可。如果这个连接已经被初始化,则直接使用,如果没有初始化,会通过 Netty 进行一系列的 client 端的初始化,并把初始化后的连接放入连接池。

懒加载与发送的细节过程


如上图所示,整个过程可以看作是 1-16 个过程。我举的例子是第一次连接 Driver 的场合,所以这 16 个步骤都会走到,而连接一次以后,后续因为 client 已经存在,初始化 client 的操作就会被省略。


  1. 从 Executor→Driver 的连接是始于CoarseGrainedExecutorBackendonStart方法,这个在上一篇中有介绍

  2. 它调用的是NettyRpcEnvasyncSetupEndpointRefByURI方法

  3. 进而利用一个EndpointRef进行调用,所有的EndpointRef都是NettyRpcEndpointRef的实例,只是传入的Endpoint的 name 不同,通过这个 name 会在目标的 server 去匹配到底哪一个Endpoint的实例解析相应的请求

  4. NettyRpcEndpointRef中会调用回NettyRpcEnvask方法

  5. 进而调用postToOutbox的方法,这个方法通过名字也可以看出,其引出了一个新的结构体,就是OutBox,所有从 client 端发出去的消息都是通过这个OutBox发出去的

  6. 在 OutBox 中,发送消息本身是按照send的流程开始的

  7. 从 send 会进入到drainOutbox的方法内,这个方法顾名思义,就是要开始对Outbox中的消息进行发送操作

  8. 在这个方法中会有一个判断,即判断OutBox中持有的 client 变量是否为null,这个 client 变量即TransportClient的实例,当第一次进入到这里的时候,client 一定是 null,所以判断 is null 一定是 yes,所以会走到第 9 步

  9. 因为没有 client,所以就要初始化出一个 client,即 launchConnectTask 这个方法的作用。而初始化 client 的过程是一个异步的过程,即会通过nettyEnv.*clientConnectionExecutor*.submit去启动一个线程 task 来去做 client 的初始化,而原来的线程的操作会被 return

  10. 通过 call 方法的填充,使得 client 可以被初始化

  11. 这里通过 call 可以调用到NettyRpcEnvcreateClient方法,从而可以初始化出一个 client,也就是TransportClient的实例

  12. 初始化好的 client 就会放入到 OutBox 中的 client 中

  13. 当初始化好 client 后,这个新建的线程 task 还会调用回刚才的drainOutbox方法,即和步骤 7 调用的一致

  14. 这个步骤和步骤 8 也是一致的,但是此时,client 已经被初始化过了,所以判断是否为 null 走的是 No 的分支

  15. 从而调用起RpcOutboxMessagesendWith的方法

  16. 进而调用起TransportClientsendRpc方法,通过这个方法, 可以利用 Netty 的 Channel 把消息发送出去,并且保存发总的requestId,方便收到response的时候判断是之前哪一个 reqeust 的消息,从而回掉发送消息时缓存的callback方法,从而完成一次一来一回的发送接收过程


通过以上的过程,Executor 就建立起了和 Driver 端的连接,通过这个连接,后续可以继续通过上述的 4 的ask方法来发送RpcRequestMessage,如果是发送的OneWayMessage,则使用send方法。从英文中的 ask 和 send 也可以分辨出来,ask 是询问,需要 answer,而 send 只是发送,不需要回答,所以从方法名也可以看出是否需要 response。具体RpcRequestMessageOneWayMessage的区别参考这里

关于 client

在 spark 中的 client,具有双重含义。


并不是只在类似于 Executor 的 Client 端存在,而是


  • Executor 的 Client 端存在TransportClient

  • Driver 的 Server 端也存在TransportClient


上面要如何理解呢?首先要明确以下几个逻辑概念


  1. TransportClient,并不是一个只存在于物理意义上的 client 的 java 的 clas,而是,在物理上 client 与 server 中都存在,目的是握取操作系统底层的对外联通的channel,通过TransportClient找到channel,进而向外发送出消息

  2. 在物理的 client 端启动的时候,TransportClient并没有被初始化,而是在我上一节讲解当中描述的,会有一个 lazy 实例化的过程

  3. 在物理的 server 端启动的时候,TransportClient的实例化是根据每连上一个物理 client 而动态创建出来的。这里需要一些 Netty 与底层网络的基础知识了,我们不去深究,暂时记住这一点即可,可以参考下面的图来理解


首先,通过这个图,我们先 High level 的理解一下数据传输的过程,以及 channel 的构建过程


  1. Client 端通过TransportClient构建出自己的 channel 联通 server 端

  2. 而 Server 端收到了 client 端的联通后,会构建出与 client 端通信的 server 端的 channel,这里,如果对底层操作系统熟悉的同学应该知道,双方的 channel 其实就是 linux 底层的 fd

  3. 构建起 channel 后,后续 client 端与 server 端就会通过这个“无形”的 channel 进行数据的交互



知道了以上的知识基础后,我们会发现,在 client 与 server 都存在 channel,在 client 端 channel 是属于TransportClient的,在 server 端是如何操作的呢?


答案是,在 server 端也是TransportClient 持有 channel,这里,spark 做的还是比较优雅,它的TransportClient是公共的,无论是 client 还是 server 都有TransportClient,其中都有 channel,它的逻辑含义都是通过TransportClient所持有的 channel 可以相互通信,而这个 channel 和TransportClient又是如何配合整体的架构体系所存在的呢,继续画图


**TransportClient与 channel 是如何在大框架中存在的**


  • 无论是在 client 端还是在 server 端,都会有**TransportClient**来持有 channel 来给对方发送消息,主要注意的是,消息并不是只有 client 端发送给 server 端,一旦 connection 联通后,双方是可以对等的给对方发送消息的,这是网路底层的基础原理

  • 当消息到达任何一方后,都是通过TransportChannelHandler来进行第一步的分发处理



再借助一个以前的图来加深印象,在 driver 代表的 Physical server 与 Executor 代表的 Physical client 的构建与结构,可以看到,双方其实除了在底层网络的 server 层面有差异,其实其余部分大体相同


Executor 的 Client 端的发送 &接收数据动作举例

  • 红色线条:executor发送消息出来到driver通过executorTransportClient把消息放入channel,发送出来

  • **蓝色线条:**消息在driver内的流转和转换,最终来到处理消息的Endpoint

  • 绿色线条:消息从driver发送回client,通过载driver中掌握的executorEndpointRefTransportClient把消息放入channel,从而发给executor

  • 这个过程中,我在最初看源代码的时候有一个疑问,driver到底是如何获取到的executorEndpointRef的,或者说,最终是通过TransportClient发送的response回去,这个TransportClientchannel是如何与EndpointRef集成到一起的呢?下一篇会详尽描述


Driver 的 Server 端的发送动作举例

  • 蓝色线条:消息从DriverEndpoint主动发出调用了持有的executorEndpointRef

  • 绿色线条EndpointRefTransportClient把消息放入channel,从而发给executorexecutor接到后调用相应的callback处理


总结


本篇其实和上一篇差不多,是上一篇的概括版,把TransportClient的逻辑细节又做了详尽的阐述。很多开发同学都会把自己陷入到网络就是 http 协议的怪圈中去,其实底层的网络 client 和 server 一旦联通之后,双方的逻辑是相同的,你可以把它比做是 websocket 协议,client→server 与 server→client 是对等的。基于此,在 spark 中,executor(client 端)主动向 driver(server 端)发送注册申请,注册之后,driver 获取了 executor 的连接(TransportClient 与 channel),才可能通过 driver(server 端)来下发任务给 executor(client 端),这块的细节,下一篇继续硬核攻击。

发布于: 2022 年 05 月 19 日阅读数: 37
用户头像

dclar

关注

有技术 有智慧 有胸怀 有眼界 2020.05.29 加入

I am an Artist

评论

发布
暂无评论
大画 Spark :: 网络(7)-Spark网络中的“四次握手”Executor注册到Driver过程中的TransportClient与细节过程_大数据_dclar_InfoQ写作社区