大画 Spark :: 网络 (2)- 下篇 - 通过网络收取消息的过程

回顾
上一篇,我们从接收到消息到 RpcEndpoint 的过程做了简单的梳理,理清了以下几个概念
Dispatcher
RpcEndpoint
RpcEndpointRef
通过与 SpringMVC 类比的方式介绍了整个的流程,具体参看这里https://xie.infoq.cn/article/9b7e1ebf236f7a4abcc81311c
图示规则
因为 scala 和 java 中的 method 是最重要的程序执行过程,所以这部分的描述我永远会用一种颜色,也就是绿色方框来表示,其余的类型会根据场景不同而调节展示方式
本篇主旨
对于消息收取的过程,我们搞清楚下图的第 5 步是怎么做的

高度抽象消息发送过程
很多技术在使用和研究的时候,大家很容易一下子陷入到细节中,无法自拔,我以前也是这样的状态。在几个细节中可以熬好久,但是总是解决不了问题。这个时候,需要跳出来,从高处重新审视一下,会有不一样的收获。
所以我总结或者分享的时候也希望在细节中走一小段后,就跳出来俯视一下,这样会理解的更透彻,也会让被分享者理解的更充分。
消息机制的分层
其实,我总结的这个消息分层机制,根据RpcRequest
或OneWayMessage
其实有些许的不同,但大概可以这么理解,方便记忆
Client 端到 Server 端的过程,可以看到在中间处理层,传递的消息以
RequestMessage
的形式到了网络组件层,封装成了
RpcRequest
到达 Server 端后重新解成了
RequestMessage
为了业务处理方便,又封装成了
RpcMessage
,RpcMessage
经过postMessage
方法的调用,通过一系列的处理最终触达到RpcEndpoint
的实例
为什么 Client 端没有独立出一个业务处理层呢?
其实 Client 端也模糊存在这个层次,但是与所谓的中间处理层交织在了一起;并且,发送消息的过程本身比较简单,不需要很复杂的封装,也就没有划分很清晰。但是接收方对于不同消息的类型以及内容要做匹配,进入不同的RpcEndpoint
的实例中,所以抽象出了一个业务处理层,我们本次探讨的也是这个层面的处理过程

Dispatcher 的消息处理模型
用一个简单过程来描述
就是利用某个
RpcEndpoint
实例唯一String
类型的 name,在 Dispatcher 中,通过endpoints
(是一个ConcurrentMap
)中的key
(name),找到 value 的EndpointData
结构体从
EndpointData
结构体中找到对应的Inbox
执行
Inbox
的process
方法Inbox
中包含最终需要调用的RpcEndpoint
的实例,根据消息类型不同,调用receiveAndReply
方法或receive
方法
以上的过程不是一个线性调用的过程,Inbox
的process
方法是通过一个线程池的生产者-消费者模式来实现的
用RpcEndpointVerifier
做RpcEndpoint
的实例
本篇,以RpcEndpoint
的一个最简单实例RpcEndpointVerifier
来说明
其唯一识别名称 name:enpoint-verifier
包含一个
receiveAndReply
方法

Dispatcher 的 endpoints
结构
endpoints 是一个ConcurrentMap
,K:String
,V:EndpointData
我们来看一下图示的数据结构,这个结构很简单清晰吧,以RpcEndpoint
的实例RpcEndpointVerifier
来举例

作用
看到 KV 结构,很容易联想到就是通过 key 的 name,获取 value 的EndpointData
结构;它本质上是一个EndpointData
数据存储的仓库
EndpointData
结构
如上图所示,name 是每一个RpcEndpoint
实例的唯一名字,EndpointData
如图所示,是以下数据的集合
RpcEndpoint
的唯一名称RpcEndpoint
的实例RpcEndpointRef
的实例Inbox
的实例
作用
存储着一个RpcEndpoint
实例的所有基础信息组合,其中最重要的结构体Inbox
是处理到达Server
端消息的一个存储器+执行引擎
Inbox
Inbox 是存储器+执行引擎
存储器:messages
这是一个串联InboxMessage
类型的LinkedList
,InboxMessage
类型就是我们前面经常见到的OneWayMessage
和RpcMessage
的trait

执行引擎:process 方法
执行引擎的作用就是利用Inbox
中的RpcEndpoint
的实例RpcEndpointVerifier
去一条条的执行messages
中的每一个InboxMessage
。过程是一个生产者消费者的过程,类似一个 MQ。

Dispatcher 中的 MessageLoop
说完上面的Inbox
的执行引擎,再来说说Dispatcher
中的MessageLoop
,其实和Inbox
中的执行过程如出一辙,只是,这个过程是一个线程池异步不断执行的过程
处理过程
如图所示,一个循环,不断的offer
入EndpointData
数据到receivers
中,不断的take
出去EndpointData
中的Inbox
,并执行其process
方法
重要的数据结构
receivers
:是一个LinkedBlockingQueue
,是一个 FIFO 的结构,每次 put 都需要开辟内存,有上限(Interger.*MAX_VALUE*
),我把它画成了一个 MQ 的 channel 的形象

Dispatcher 中的调用入口
从这张图上看出,在postRemoteMessage
方法中合成了RpcMessage
,并传递给了postMessage
方法,postMessage
方法间接调用到最终RpcEndpoint
实例(RpcEndpointVerifier
)

传递参数
在两个方法的调用中,传递了RpcEndpoint
实例name
和RpcMessage
的实例

串联起整个流程
根据上面的基础知识,我们可以串联起整个的细节流程
通过
postRemoteMessage
→postMessage
传递了endpoint-verifier
唯一识别名RpcMessage
至于 endpoint-verifier 是怎么来的,
RpcMessage
如何封装的,这些细节后续我们逐一展开,揭开真个流程的全部真相postMessage
的方法中,通过endpoint-verifier
的名字去endpoints
中寻找对应的V:EndpointData
获取对应的
EndpointData
把
RpcMessage
放入到EndpointData
的Inbox
中,类似于邮件投递到了 box 中把
EndpointData
放入到receivers
的“MQ”中,即offer
出 data 到receivers
通过
MessageLoop
的消费过程,经过take
,获取到EndpointData
调用
EndpointData
的Inbox
的process
方法,这个Inbox
是属于RpcEndpointVerifier
的消息 box,也持有RpcEndpointVerifier
的对象process
方法中会循环poll
出messages
中的每一条InboxMessage
根据 message 类型不同,有 case 的判断,如果是 case
RpcMessage
的话,按照RpcMessage
的分支执行因为是
RpcMessage
,所以执行receiveAndReply
方法

至此,一个从 Dispatcher 到“端”RpcEndpoint
的传入+执行过程结束,但是,因为是RpcMessage
,还存在这个reply
的过程,这个过程,我们后续再继续聊。
总结
本篇,我们要搞明白最终如何调用到的RpcEndpoint
实例,废话不多说,一副高度概括的图示来结束

Next
下一篇,我们来看一下RpcMessage
(也就是 spark 网络组件层的RpcRequest
)的 reply 机制。在本篇中,我们讲述了几个数据结构,比如EndpointData
,肯定有小伙伴要问,它是怎么构建出来的?我们后面聊,面纱一层层的揭开,不急。
版权声明: 本文为 InfoQ 作者【dclar】的原创文章。
原文链接:【http://xie.infoq.cn/article/dada8955812896dc8b4b9ff1a】。文章转载请联系作者。
评论