写点什么

大画 Spark :: 网络 (2)- 下篇 - 通过网络收取消息的过程

作者:dclar
  • 2022 年 1 月 15 日
  • 本文字数:2798 字

    阅读完需:约 9 分钟

大画 Spark :: 网络(2)-下篇-通过网络收取消息的过程

回顾

上一篇,我们从接收到消息到 RpcEndpoint 的过程做了简单的梳理,理清了以下几个概念


  • Dispatcher

  • RpcEndpoint

  • RpcEndpointRef


通过与 SpringMVC 类比的方式介绍了整个的流程,具体参看这里https://xie.infoq.cn/article/9b7e1ebf236f7a4abcc81311c

图示规则

因为 scala 和 java 中的 method 是最重要的程序执行过程,所以这部分的描述我永远会用一种颜色,也就是绿色方框来表示,其余的类型会根据场景不同而调节展示方式

本篇主旨

  • 对于消息收取的过程,我们搞清楚下图的第 5 步是怎么做的


高度抽象消息发送过程

很多技术在使用和研究的时候,大家很容易一下子陷入到细节中,无法自拔,我以前也是这样的状态。在几个细节中可以熬好久,但是总是解决不了问题。这个时候,需要跳出来,从高处重新审视一下,会有不一样的收获。


所以我总结或者分享的时候也希望在细节中走一小段后,就跳出来俯视一下,这样会理解的更透彻,也会让被分享者理解的更充分。

消息机制的分层

其实,我总结的这个消息分层机制,根据RpcRequestOneWayMessage其实有些许的不同,但大概可以这么理解,方便记忆


  • Client 端到 Server 端的过程,可以看到在中间处理层,传递的消息以RequestMessage的形式

  • 到了网络组件层,封装成了RpcRequest

  • 到达 Server 端后重新解成了RequestMessage

  • 为了业务处理方便,又封装成了RpcMessageRpcMessage经过postMessage方法的调用,通过一系列的处理最终触达到RpcEndpoint的实例


为什么 Client 端没有独立出一个业务处理层呢?


其实 Client 端也模糊存在这个层次,但是与所谓的中间处理层交织在了一起;并且,发送消息的过程本身比较简单,不需要很复杂的封装,也就没有划分很清晰。但是接收方对于不同消息的类型以及内容要做匹配,进入不同的RpcEndpoint的实例中,所以抽象出了一个业务处理层,我们本次探讨的也是这个层面的处理过程


Dispatcher 的消息处理模型

用一个简单过程来描述


  • 就是利用某个RpcEndpoint实例唯一String类型的 name,在 Dispatcher 中,通过endpoints(是一个ConcurrentMap)中的key(name),找到 value 的EndpointData结构体

  • EndpointData结构体中找到对应的Inbox

  • 执行Inboxprocess方法

  • Inbox中包含最终需要调用的RpcEndpoint的实例,根据消息类型不同,调用receiveAndReply方法或receive方法


以上的过程不是一个线性调用的过程,Inboxprocess方法是通过一个线程池的生产者-消费者模式来实现的

RpcEndpointVerifierRpcEndpoint的实例

本篇,以RpcEndpoint的一个最简单实例RpcEndpointVerifier来说明


  • 其唯一识别名称 name:enpoint-verifier

  • 包含一个receiveAndReply方法


Dispatcher 的 endpoints

结构

endpoints 是一个ConcurrentMapK:StringV:EndpointData


private val endpoints: ConcurrentMap[String, EndpointData] =    new ConcurrentHashMap[String, EndpointData]
复制代码


我们来看一下图示的数据结构,这个结构很简单清晰吧,以RpcEndpoint的实例RpcEndpointVerifier来举例


作用

看到 KV 结构,很容易联想到就是通过 key 的 name,获取 value 的EndpointData结构;它本质上是一个EndpointData数据存储的仓库

EndpointData

结构

如上图所示,name 是每一个RpcEndpoint实例的唯一名字,EndpointData如图所示,是以下数据的集合


  • RpcEndpoint的唯一名称

  • RpcEndpoint的实例

  • RpcEndpointRef的实例

  • Inbox的实例

作用

存储着一个RpcEndpoint实例的所有基础信息组合,其中最重要的结构体Inbox是处理到达Server端消息的一个存储器+执行引擎

Inbox

Inbox 是存储器+执行引擎

存储器:messages

这是一个串联InboxMessage类型的LinkedListInboxMessage类型就是我们前面经常见到的OneWayMessageRpcMessagetrait


执行引擎:process 方法

执行引擎的作用就是利用Inbox中的RpcEndpoint的实例RpcEndpointVerifier去一条条的执行messages中的每一个InboxMessage。过程是一个生产者消费者的过程,类似一个 MQ。


Dispatcher 中的 MessageLoop

说完上面的Inbox的执行引擎,再来说说Dispatcher中的MessageLoop,其实和Inbox中的执行过程如出一辙,只是,这个过程是一个线程池异步不断执行的过程


/** Thread pool used for dispatching messages. */  private val threadpool: ThreadPoolExecutor = {    val availableCores =      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",      math.max(2, availableCores))    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")    for (i <- 0 until numThreads) {      pool.execute(new MessageLoop)    }    pool  }
复制代码

处理过程

如图所示,一个循环,不断的offerEndpointData数据到receivers中,不断的take出去EndpointData中的Inbox,并执行其process方法

重要的数据结构

  • receivers:是一个LinkedBlockingQueue,是一个 FIFO 的结构,每次 put 都需要开辟内存,有上限(Interger.*MAX_VALUE*),我把它画成了一个 MQ 的 channel 的形象


Dispatcher 中的调用入口

从这张图上看出,在postRemoteMessage方法中合成了RpcMessage,并传递给了postMessage方法,postMessage方法间接调用到最终RpcEndpoint实例(RpcEndpointVerifier


传递参数

在两个方法的调用中,传递了RpcEndpoint实例nameRpcMessage的实例


串联起整个流程

根据上面的基础知识,我们可以串联起整个的细节流程


  1. 通过postRemoteMessagepostMessage 传递了

  2. endpoint-verifier唯一识别名

  3. RpcMessage

  4. 至于 endpoint-verifier 是怎么来的,RpcMessage如何封装的,这些细节后续我们逐一展开,揭开真个流程的全部真相

  5. postMessage的方法中,通过endpoint-verifier的名字去endpoints中寻找对应的V:EndpointData

  6. 获取对应的EndpointData

  7. RpcMessage放入到EndpointDataInbox中,类似于邮件投递到了 box 中

  8. EndpointData放入到receivers的“MQ”中,即offer出 data 到receivers

  9. 通过MessageLoop的消费过程,经过take,获取到EndpointData

  10. 调用EndpointDataInboxprocess方法,这个Inbox是属于RpcEndpointVerifier的消息 box,也持有RpcEndpointVerifier的对象

  11. process方法中会循环pollmessages中的每一条InboxMessage

  12. 根据 message 类型不同,有 case 的判断,如果是 case RpcMessage的话,按照RpcMessage的分支执行

  13. 因为是RpcMessage,所以执行receiveAndReply方法



至此,一个从 Dispatcher 到“端”RpcEndpoint的传入+执行过程结束,但是,因为是RpcMessage,还存在这个reply的过程,这个过程,我们后续再继续聊。

总结

本篇,我们要搞明白最终如何调用到的RpcEndpoint实例,废话不多说,一副高度概括的图示来结束


Next

下一篇,我们来看一下RpcMessage(也就是 spark 网络组件层的RpcRequest)的 reply 机制。在本篇中,我们讲述了几个数据结构,比如EndpointData,肯定有小伙伴要问,它是怎么构建出来的?我们后面聊,面纱一层层的揭开,不急。

发布于: 刚刚阅读数: 2
用户头像

dclar

关注

有技术 有智慧 有胸怀 有眼界 2020.05.29 加入

代码是有生命的艺术品

评论

发布
暂无评论
大画 Spark :: 网络(2)-下篇-通过网络收取消息的过程