Spark 源代码::Spark 多线程::NettyRpcEnv.ask 解读

背景
Spark 中有很多异步处理的例子,每一个地方都值得好好去审视一番,对辅助理解 spark 的机理以及为自己写出优雅的代码都会有很大的帮助。
NettyRpcEnv.ask 解读
RpcEnv 作用
NettyRpcEnv是RpcEnv的在 spark 中的唯一一个实现。RpcEnv是什么呢,可以先看一下它的 class 头信息
就是一句话,RPC 的环境。在这里,最重要的 2 个操作莫过于
- 可以去注册 - RpcEndpoint
- 可以去异步获取 - RpcEndpointRef
而RpcEndpoint和RpcEndpointRef是什么呢,在这里不做详细赘述,其他的文章中会详细说明,简单来讲一下
简单回顾RpcEndpoint和RpcEndpointRef
RpcEndpoint
- RpcEndpoint
- 众所周知,spark 内部会有 - executor,- driver等角色,他们之间的通信都采用利用 Netty,在 executor 或者 driver 上并不是只启动 1 个 Netty 的服务,针对不同的功能会有多个 Netty 的 RPC 服务开启,利用不同的端口号进行区分。服务间通信后,通的“信”被很多种逻辑单元来处理,如- Inbox,如- EventLoop等,这些都是工具级别的单元,而被抽象出来作为可插拔可扩展的大的逻辑功能模块在 Spark 中就叫做- RpcEndpoint,它是用来处理从其他- client端发送或者- server端返回过来的- message的模块。- RpcEndpoint本身是一个 trait,它可以有多种的实现
RpcEndpointRef
- RpcEndpointRef
- spark 之前的网络通信都是采用 akka,改版后采用的是 Netty,在 akka 中,如果一个两个节点间的通信是利用目的方的 actorRef 来进行的通信的,即 AActor 希望发送消息到 BActor,需要 BActorRef 来发送消息。Spark 的网络通信升级到 Netty 后,Endpoint 就可以间接理解成原来的 Actor,那么发送消息到另一个 Actor 的话,也需要 - RpcEndpoint的 Ref,即- RpcEndpointRef。这个概念乍一看有点懵,试想,从 A 发送消息到 B,能发送的前提是 A 先拥有了一个 B 的”引用“,这在普通的 Http 服务中貌似很不能被理解,我想访问某一台机器按说只需要知道对方的 IP 和 Port 不就 OK 了,现在还需要对方的一个“替身”?这是什么鬼?带着问题我们可以持续往下看即可,这里你只需要这样意识即可:
- 用来访问 B machine 的 - RpcEndpointRef你理解成就是 B machine 的 IP 和 Port 的一个被包装后的实例即可
图解RpcEndpoint和RpcEndpointRef
- 图解一下 
- A machine 可以是物理机可以是虚拟机,B machine 可以是和 A 同一台物理机、虚拟机(端口号不同),也可以是不同的(在 spark 中甚至于有自己发给自己的 msg,后续会讲)。那么从 A 发送消息到 B 的话,使用的是 B 的 - RpcEndpointRef,通过它发送消息到 B machine
- 【图 1】要如何访问 
 
 - 【图 2】内部的原理 
 
 - 【图 3】B machine 的 RpcEndpointRef 的实例是啥(简化版) 
 
 Driver 和 Executor
k,顾名思义——问。可能是打个招呼,看看在不在,询问一下,等等。这个就是 NettyRpcEnv.ask 的作用所在。为了讲 NettyRpcEnv.ask 的作用,还需要简单的串一下一下概念和流程
Driver 线程和 Executor 进程
首先,需要明确两个事情,在 yarn 环境下
- Driver是在- ApplicatioMaster进程中执行的一个线程
- 严格来说,其实这个说法也不太正确,因为 - Driver其实是在用户的 class 的时候,在形成- sparkContext上下文环境的一个产物,本身执行的其实是用户 class 线程,在这个线程中建立了- SparkEnv以及- RpcEnv等等,并且建立了- Driver的 Netty 的 Service 等等,与- Executor相互通信
- Executor则是一个个的进程,通过 java 命令在每一个节点上启动的
Yarn 系列以及 ApplicatioMaster是什么这里不做赘述,其他文章中会细讲。
其次,在这里只需要了解到,Driver本身是一个协调调度节点,它可以去分配任务给Executor,并且掌握着Executor的情况,分配就是把 Task 发送给Executor,掌握则指的是需要知道Executor的运行情况等等。
- 【图 4】图解一下 
- 举个栗子,1 个 - Driver和 2 个- Executor进行交互通信,- Driver手握 2 个- Executor(一个叫做 E1,一个叫做 E2)的- RpcEndpointRef,姑且简称为 E1Ref 和 E2Ref,通过这 2 个 Ref 发送 msg 到 E1 节点和 E2 节点,这 2 个节点本身通过自身的- RpcEndpoint来处理 msg。而 E1 和 E2 本身还要定期起的向 Driver 汇报自身的情况,这里叫做 heartbeat 心跳,那么反过来则是利用各自内部掌握的- DriverEpcEndpointRef来发送 heartbeat 到- Driver,而- Driver利用其自己的- DriverRpcEndpoint来处理 heartbeat 的 msg。所有节点的上面的组建则都在自身的- NettyRpcEnv中,也就是- RpcEnv的实现。
 
 举例:在 RpcEnv 中建立一个 DriverRpcEndpointRef
背景
终于要说到本篇的内容了,NettyRpcEnv.ask 的解读,需要有一个场景调用 NettyRpcEnv.ask 的方法才可以,那可以在题中所述的RpcEnv中建立一个DriverRpcEndpointRef这个场景中描述
RpcEnv 中为啥建立 DriverRpcEndpointRef
上面的【图 4】介绍了一个 Driver 和和 Executor 之间通信的过程。其实,在ApplicationMaster中构建Driver线程的时候,有一部分的通信是需要通过DriverRpcEndpointRef进行的,即利用DriverRpcEndpointRef发送 msg 给DriverRpcEndpoint,DriverRpcEndpoint做出处理并响应
- 【图 5】图解一下 
- ApplicationMaster中启动【Run】Driver 的线程后,从 Driver 线程中拿到了- NettyRpcEnv,
- 并且利用 - NettyRpcEnv的- setupEndpointRef方法【Get】到两个- DriverEndpointRef
- 后续通过【Use】这个 - DriverEndpointRef去访问 Driver 的- DriverEndpoint
- 有一点需要说明的是, - ApplicationMaster的节点本身也是- Driver的节点,其实访问- Driver的- DriverEndpoint按说是可以直接访问的(Spark 源代码中没有这样实现,还是为了隔离和封装的更好,减少耦合,今后- Driver如果作为进程执行,不再- ApplicationMaster上运行也会修改的较为简单),但是这里还是采用了 Netty 的 Rpc 访问方式
 
 源代码
这部分代码在ApplicationMaster.scala中,关注方法runDriver即可
- 【图 6】图解一下 
- (I) 有一台 IP 是 10.1.2.5 的服务器,启动了 - ApplicationMaster
- (II) **a 过程,**在这个节点上启动了 Driver 的线程,并且初始化了用户的 class,并且在 10.1.2.5 节点上启动了一个 Netty 的 serviec,IP 和 Port 为 10.1.2.5:13200 
- (III) b**过程,**在 - ApplicationMaster节点上继续调用 RpcEnv.setupEndpointRef,目的是 setup 一个- Driver的- DriverEndpointRef到- RpcEnv中,这个 setup 的过程就是去 10.1.2.5:13200 访问一下,如果服务通了,则构建出- DriverEndpointRef,这个“访问一下”即本文所述要用到的 NettyRpcEnv.ask 的方法。
- 可以看到调用顺序为 
- (ApplicationMaster.scala) rpcEnv.setupEndpointRef ↓ 
- (NettyRpcEnv.scala) NettyRpcEnv.asyncSetupEndpointRefByURI ↓ 
- (NettyRpcEndpointRef.scala) NettyRpcEndpointRef.ask ↓ 
- (NettyRpcEnv.scala) NettyRpcEnv.ask — — — — ↓ (经过多个步骤,中间部分省略,其他文章会讲) 
- 10.1.2.5:13200 的 netty 服务 
 
 - 代码如下 
解读 NettyRpcEnv.ask
回顾 Future
如何理解 Future 呢,从字面意思可以很好的理解,Future 即未来,也是期货的意思。
说到期货,就充满了不确定性,因为毕竟没有发生,谁也不知道未来会怎样。所以,定义一个 Future 就是定义了一个不在现在这个时空(线程)发生的(未来)的另一个(另一个线程的)事件,相比 java 的鸡肋的 Future,scala 的 Future 可谓是非常优雅且完美,搜索我的博客可以看到针对 scala 的 Future 的详细介绍。
- 官方文章:https://docs.scala-lang.org/zh-cn/overviews/core/futures.html 
- 这里不从源代码的角度去构建 Future 和 Promise 的认知观念,会有其他的文章再做解释 
- 【图 7】图解一下 
- 在 java 中定义一个线程是右侧的做法,而在左侧的 scala 中,利用 Future 则优雅了很多 
 
 - 代码 
- Future + callback(截取部分) 
- 注意点 
- 定义了 Future,则定义了需要执行的线程的执行体(body),那么执行也是立刻马上,类似于 java 定义了一个 Thread,然后直接调用了 - start()一样
- 在 Future 中大量运用了 scala 的 Try[],如果出现了异常,没有做 - onFailure的处理,那么可能看不到异常被抛出来,这点和 java 有较大区别
回顾 Promise
从皮毛简单的说完了 Future,那 Promise 又是什么呢?其实在 Future 的实现中包含了 Promise 的实现,也就是说没有 Promise,Future 是无法被运行的。从字面的理解,Promise 是承诺,有了 Future 的未来的定义,那么需要给出一个确切的承诺才可以进行,否则都是空口无凭天马行空无法兑现的大话。
说到现在,包括看完以上的 Future 的介绍,很多人肯定还是懵 b 状态,因为我刚开始接触的时候也是这样,但我喜欢的就是用最直观的图和想象来描述一个抽象的问题,二话不说,继续上图
- 【图 8】图解一下:Future 与 Promise 的关系 
- Future 的含义 
- 主线人生是你的 Main Thread,在 spark 中可能是某一个处理的 Thread 
- 在 Now 这个时刻,你开启了变成 star 之路(become star 的 Thread) 
- 在 Now 这个时刻,你开启了变帅之路(become handsome 的 Thread) 
- 一旦你开启这两条路,只要你的 Main Thread 没有结束,那么你可以一直持续的去走完这两个“之路”,直到 Success 或者 Failure,这就是 Future,可以理解为,开启了一条新的轨迹 
 
 - Promiose 的含义 
- 当你开启了两条新的“之路”的时候,我可以在你两条路的重点给你不同的承诺 
- 当你 success 的完成了 Future 的时候,我 promise 你一个结果 
- 当你 failure 的完成了 Future 的时候,我 promise 你另一个结果 
 
 - Future 与 Promise 的对比 
- Future 是一条线,包含执行过程的一条线,按照 Timeline 要去走下去 
- Promise 是一个点,一个被触发的点,想达到这个点必须又一个 Future 搭出一条路径才可以 
- 上面两句话如何理解呢,你可以这么想,人生(Main Thread)是一个数轴,你如果希望按照 timeline 向着右侧一直前进就需要有一条连续的道路,这个“道路”就是一个 Thread,也可以是 Future 定义出的道路,我们只能脚踏实地的通过道路走到目标终点,而不能直接跳到终点。Promise 类似于一个 milestone 点,如果只有一个 Promise,不定义出“道路”也就是不定义出一个 Future(或 Main Thread)的话,是无法实现这个 Promise 的。只定义了 Promise,不去考虑直线路径(Future),无法实现,但只定义 Future,不定义 Promise(其实在 Future 中是内置了 Default 的 Promise 的)是可以直接执行 Future 的。如下图所示,开出了两张空头支票,没有定义具体的路线(Future 实现方式),那么这两个 Promise 是无法兑现的。 
- 需要注意一点,这张图只是画出了定义了 Promise,但是如果想对象这个 Promise 的话,是可以通过 Promise 中的方法来搭建出一个 Future 来执行的,与 Future 不同的是,Future 只要定义了就可以马上执行,Promise 定义了的话,必须要显式的触发“搭建 Future”的操作才可以。 
 
 - 看看 Promise 不执行的代码 
- 这里,我们定义了一个 Promise,并且“承诺”在 Promise 对应的 future 结束后调用一个 - map操作打印出一句话 future:....
- 但我们执行以下语句的时候会发现什么都没有执行 
- 看看可以执行的代码 
- 与上面代码唯一不同的就是,加入了 - promise.trySuccess的处理
- 代码细节在其他篇章我们具体看,这里你可以这样理解,加入和 trySuccess,就是为达到 Promise 搭建了一条 Future 之路,并触发这条路开始执行(start) 
- 至于 trySuccess,tryComplete 等具体的细节讲 scala 多线程的地方可以细说 
- promise.future.onComplete
- 在 Future 执行完毕后的 callback 处理,无论是 Success 还是 Failure 都可以执行这个 - onComplete处理
- promise.future.map
- 在 - promise.future.onComplete之后对 Future 进行的继续的 map 处理
- promise.trySuccess
- 触发整个 Future 执行的 trigger 
ask 的代码
其实,讲完了上面的所有的内容后,ask 的代码感觉几句话就可以讲解完毕了。
ask 本身返回的是 Future,本身是异步处理
- 【图 9】图解一下 
- 一台 10.1.1.1 的 client 机器通过 rpc 访问一台 10.1.1.2 的 Netty 的 service,当 response 正确返回后,在 client 机器中的 - TransportResponseHandler中进行判断处理,并且调用 listener 的 onSuccess 方法,这个 onSuccess 方法则是下面的 ask 代码中定义的方法。在这个方法中本身又去执行了 promise 的 tryComplete,从而触发了 promise 的 future 之路执行
 
 总结
本篇用小篇幅讲解了一下*o.a.s.rpc.netty.NettyRpcEnv.ask()*的方法,简单描述了一个 spark 的异步处理的小 case,这个小 case 需要不少的先验知识点,可能突然间看到这里有点懵,学习需要融会贯通一点点的来积累才可以,如果不明白可以慢慢积累其他模块的知识再来这里看流水账会更有收获。
版权声明: 本文为 InfoQ 作者【dclar】的原创文章。
原文链接:【http://xie.infoq.cn/article/06b5b01fc69d6a54a131b9408】。文章转载请联系作者。












 
    
评论