写点什么

Spark 源代码::Spark 多线程::NettyRpcEnv.ask 解读

用户头像
dclar
关注
发布于: 2021 年 07 月 17 日
Spark源代码::Spark多线程::NettyRpcEnv.ask解读

背景

Spark 中有很多异步处理的例子,每一个地方都值得好好去审视一番,对辅助理解 spark 的机理以及为自己写出优雅的代码都会有很大的帮助。

NettyRpcEnv.ask 解读

RpcEnv 作用

NettyRpcEnvRpcEnv的在 spark 中的唯一一个实现。RpcEnv是什么呢,可以先看一下它的 class 头信息


/** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the * sender, or logging them if no such sender or `NotSerializableException`. * * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. */
复制代码


就是一句话,RPC 的环境。在这里,最重要的 2 个操作莫过于


  • 可以去注册RpcEndpoint

  • 可以去异步获取RpcEndpointRef


RpcEndpointRpcEndpointRef是什么呢,在这里不做详细赘述,其他的文章中会详细说明,简单来讲一下

简单回顾RpcEndpointRpcEndpointRef

RpcEndpoint

  • RpcEndpoint

  • 众所周知,spark 内部会有executordriver等角色,他们之间的通信都采用利用 Netty,在 executor 或者 driver 上并不是只启动 1 个 Netty 的服务,针对不同的功能会有多个 Netty 的 RPC 服务开启,利用不同的端口号进行区分。服务间通信后,通的“信”被很多种逻辑单元来处理,如Inbox,如EventLoop等,这些都是工具级别的单元,而被抽象出来作为可插拔可扩展的大的逻辑功能模块在 Spark 中就叫做RpcEndpoint,它是用来处理从其他client端发送或者server端返回过来的message的模块。RpcEndpoint本身是一个 trait,它可以有多种的实现

RpcEndpointRef

  • RpcEndpointRef

  • spark 之前的网络通信都是采用 akka,改版后采用的是 Netty,在 akka 中,如果一个两个节点间的通信是利用目的方的 actorRef 来进行的通信的,即 AActor 希望发送消息到 BActor,需要 BActorRef 来发送消息。Spark 的网络通信升级到 Netty 后,Endpoint 就可以间接理解成原来的 Actor,那么发送消息到另一个 Actor 的话,也需要RpcEndpoint的 Ref,即RpcEndpointRef。这个概念乍一看有点懵,试想,从 A 发送消息到 B,能发送的前提是 A 先拥有了一个 B 的”引用“,这在普通的 Http 服务中貌似很不能被理解,我想访问某一台机器按说只需要知道对方的 IP 和 Port 不就 OK 了,现在还需要对方的一个“替身”?这是什么鬼?带着问题我们可以持续往下看即可,这里你只需要这样意识即可:

  • 用来访问 B machine 的RpcEndpointRef你理解成就是 B machine 的 IP 和 Port 的一个被包装后的实例即可

图解RpcEndpointRpcEndpointRef

  • 图解一下

  • A machine 可以是物理机可以是虚拟机,B machine 可以是和 A 同一台物理机、虚拟机(端口号不同),也可以是不同的(在 spark 中甚至于有自己发给自己的 msg,后续会讲)。那么从 A 发送消息到 B 的话,使用的是 B 的RpcEndpointRef,通过它发送消息到 B machine

  • 【图 1】要如何访问


    • 【图 2】内部的原理

    • 【图 3】B machine 的 RpcEndpointRef 的实例是啥(简化版)

    Driver 和 Executor

    k,顾名思义——问。可能是打个招呼,看看在不在,询问一下,等等。这个就是 NettyRpcEnv.ask 的作用所在。为了讲 NettyRpcEnv.ask 的作用,还需要简单的串一下一下概念和流程

    Driver 线程和 Executor 进程

    首先,需要明确两个事情,在 yarn 环境下


    • Driver是在ApplicatioMaster进程中执行的一个线程

    • 严格来说,其实这个说法也不太正确,因为Driver其实是在用户的 class 的时候,在形成sparkContext上下文环境的一个产物,本身执行的其实是用户 class 线程,在这个线程中建立了SparkEnv以及RpcEnv等等,并且建立了Driver的 Netty 的 Service 等等,与Executor相互通信

    • Executor则是一个个的进程,通过 java 命令在每一个节点上启动的


    Yarn 系列以及 ApplicatioMaster是什么这里不做赘述,其他文章中会细讲。


    其次,在这里只需要了解到,Driver本身是一个协调调度节点,它可以去分配任务给Executor,并且掌握着Executor的情况,分配就是把 Task 发送给Executor,掌握则指的是需要知道Executor的运行情况等等。


    • 【图 4】图解一下

    • 举个栗子,1 个Driver和 2 个Executor进行交互通信,Driver手握 2 个Executor(一个叫做 E1,一个叫做 E2)的RpcEndpointRef,姑且简称为 E1Ref 和 E2Ref,通过这 2 个 Ref 发送 msg 到 E1 节点和 E2 节点,这 2 个节点本身通过自身的RpcEndpoint来处理 msg。而 E1 和 E2 本身还要定期起的向 Driver 汇报自身的情况,这里叫做 heartbeat 心跳,那么反过来则是利用各自内部掌握的DriverEpcEndpointRef来发送 heartbeat Driver,而Driver利用其自己的DriverRpcEndpoint来处理 heartbeat 的 msg。所有节点的上面的组建则都在自身的NettyRpcEnv中,也就是RpcEnv的实现。


    举例:在 RpcEnv 中建立一个 DriverRpcEndpointRef

    背景

    终于要说到本篇的内容了,NettyRpcEnv.ask 的解读,需要有一个场景调用 NettyRpcEnv.ask 的方法才可以,那可以在题中所述的RpcEnv中建立一个DriverRpcEndpointRef这个场景中描述

    RpcEnv 中为啥建立 DriverRpcEndpointRef

    上面的【图 4】介绍了一个 Driver 和和 Executor 之间通信的过程。其实,在ApplicationMaster中构建Driver线程的时候,有一部分的通信是需要通过DriverRpcEndpointRef进行的,即利用DriverRpcEndpointRef发送 msg 给DriverRpcEndpointDriverRpcEndpoint做出处理并响应


    • 【图 5】图解一下

    • ApplicationMaster 中启动【Run】Driver 的线程后,从 Driver 线程中拿到了NettyRpcEnv

    • 并且利用NettyRpcEnvsetupEndpointRef方法【Get】到两个DriverEndpointRef

    • 后续通过【Use】这个DriverEndpointRef去访问 Driver 的DriverEndpoint

    • 有一点需要说明的是,ApplicationMaster的节点本身也是Driver的节点,其实访问DriverDriverEndpoint按说是可以直接访问的(Spark 源代码中没有这样实现,还是为了隔离和封装的更好,减少耦合,今后Driver如果作为进程执行,不再ApplicationMaster上运行也会修改的较为简单),但是这里还是采用了 Netty 的 Rpc 访问方式


    源代码

    这部分代码在ApplicationMaster.scala中,关注方法runDriver即可


    • 【图 6】图解一下

    • (I) 有一台 IP 是 10.1.2.5 的服务器,启动了ApplicationMaster

    • (II) **a 过程,**在这个节点上启动了 Driver 的线程,并且初始化了用户的 class,并且在 10.1.2.5 节点上启动了一个 Netty 的 serviec,IP 和 Port 为 10.1.2.5:13200

    • (III) b**过程,**在ApplicationMaster节点上继续调用 RpcEnv.setupEndpointRef,目的是 setup 一个DriverDriverEndpointRefRpcEnv中,这个 setup 的过程就是去 10.1.2.5:13200 访问一下,如果服务通了,则构建出DriverEndpointRef,这个“访问一下”即本文所述要用到的 NettyRpcEnv.ask 的方法。

    • 可以看到调用顺序为

    • (ApplicationMaster.scala) rpcEnv.setupEndpointRef ↓

    • (NettyRpcEnv.scala) NettyRpcEnv.asyncSetupEndpointRefByURI ↓

    • (NettyRpcEndpointRef.scala) NettyRpcEndpointRef.ask ↓

    • (NettyRpcEnv.scala) NettyRpcEnv.ask — — — — ↓ (经过多个步骤,中间部分省略,其他文章会讲)

    • 10.1.2.5:13200 的 netty 服务


    • 代码如下

        private def runDriver(): Unit = {        addAmIpFilter(None)
    /* 这里,调用startUserApplication方法来执行用户的class,也就是我们的jar包, invoke我们的main方法,从而启动了sparkContext,内部启动一系列的scheduler以及 backend,以及taskscheduler等等等等core的内容,其他篇章会详细讲解 */ userClassThread = startUserApplication()
    // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try {
    /* 这里,阻塞的等待SparkContext从Driver线程中返回回来 */ val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv
    val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl))
    /* **这里,上演了好戏,通过NettyRpcEnv的setupEndpointRef方法来获取到driverRef 这个里面其实是去ask一下Driver你在吗?是否存在这个Driver的服务,如果存在,则 返回OK,构建出Driver的Ref** */ val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
    复制代码

    解读 NettyRpcEnv.ask

    回顾 Future

    如何理解 Future 呢,从字面意思可以很好的理解,Future 即未来,也是期货的意思。


    说到期货,就充满了不确定性,因为毕竟没有发生,谁也不知道未来会怎样。所以,定义一个 Future 就是定义了一个不在现在这个时空(线程)发生的(未来)的另一个(另一个线程的)事件,相比 java 的鸡肋的 Future,scala 的 Future 可谓是非常优雅且完美,搜索我的博客可以看到针对 scala 的 Future 的详细介绍。


    • 官方文章https://docs.scala-lang.org/zh-cn/overviews/core/futures.html

    • 这里不从源代码的角度去构建 Future 和 Promise 的认知观念,会有其他的文章再做解释

    • 【图 7】图解一下

    • 在 java 中定义一个线程是右侧的做法,而在左侧的 scala 中,利用 Future 则优雅了很多


    • 代码

        import scala.concurrent.ExecutionContext.Implicits.global    import scala.concurrent.Future
    /** * 解读Future的基础 */ object DocFutureTest {
    def apply(): Unit = { println("I am DocFutureTest") }
    def main(args: Array[String]): Unit = {
    val sleeping = 3000; val main_thread = Thread.currentThread().getName; /* 定义另一个线程发生的事件 这个事件相当于java中的如下的代码块: 从整体的间接性上看,scala的更为优雅一些,直接一个Future可以包裹住左右需要处理的内容 后续如果需要进行异常处理的话还可以根据Success和Failture进行模式匹配 public class JavaThreading { public static void main(String[] args) throws InterruptedException { new Thread( () -> System.out.println("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName() + " 线程的故事") ).start(); System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); } } */ var future_run = Future { Thread.sleep(1000) println("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName +" 线程的故事") }
    // 主线程休息3000ms // 如果不休息的话,main线程会先停止,导致上面的Future定义的thread还没有被执行到就结束了 Thread.sleep(sleeping) println(s"$main_thread 线程休息 $sleeping 毫秒")
    }
    }
    复制代码


    • Future + callback(截取部分)


        case class ExceptionError(error: String) extends Exception(error)
    def main(args: Array[String]): Unit = {
    val sleeping = 3000; val main_thread = Thread.currentThread().getName;
    // 定义另一个线程发生的事件 var future_run = Future { Thread.sleep(1000) prntln("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName + " 线程的故事") // 如果需要onFailure的话 则释放此句 // throw ExceptionError("error")
    future_run onFailure { case t => println("exception " + t.getMessage) }
    future_run onSuccess { case _ => println("success") }
    复制代码


    • 注意点

    • 定义了 Future,则定义了需要执行的线程的执行体(body),那么执行也是立刻马上,类似于 java 定义了一个 Thread,然后直接调用了start()一样

    • 在 Future 中大量运用了 scala 的 Try[],如果出现了异常,没有做onFailure的处理,那么可能看不到异常被抛出来,这点和 java 有较大区别

    回顾 Promise

    从皮毛简单的说完了 Future,那 Promise 又是什么呢?其实在 Future 的实现中包含了 Promise 的实现,也就是说没有 Promise,Future 是无法被运行的。从字面的理解,Promise 是承诺,有了 Future 的未来的定义,那么需要给出一个确切的承诺才可以进行,否则都是空口无凭天马行空无法兑现的大话。



    说到现在,包括看完以上的 Future 的介绍,很多人肯定还是懵 b 状态,因为我刚开始接触的时候也是这样,但我喜欢的就是用最直观的图和想象来描述一个抽象的问题,二话不说,继续上图


    • 【图 8】图解一下Future Promise 的关系

    • Future 的含义

    • 主线人生是你的 Main Thread,在 spark 中可能是某一个处理的 Thread

    • 在 Now 这个时刻,你开启了变成 star 之路(become star 的 Thread)

    • 在 Now 这个时刻,你开启了变帅之路(become handsome 的 Thread)

    • 一旦你开启这两条路,只要你的 Main Thread 没有结束,那么你可以一直持续的去走完这两个“之路”,直到 Success 或者 Failure,这就是 Future,可以理解为,开启了一条新的轨迹


    • Promiose 的含义

    • 当你开启了两条新的“之路”的时候,我可以在你两条路的重点给你不同的承诺

    • 当你 success 的完成了 Future 的时候,我 promise 你一个结果

    • 当你 failure 的完成了 Future 的时候,我 promise 你另一个结果


    • Future 与 Promise 的对比

    • Future 是一条线,包含执行过程的一条线,按照 Timeline 要去走下去

    • Promise 是一个点,一个被触发的点,想达到这个点必须又一个 Future 搭出一条路径才可以

    • 上面两句话如何理解呢,你可以这么想,人生(Main Thread)是一个数轴,你如果希望按照 timeline 向着右侧一直前进就需要有一条连续的道路,这个“道路”就是一个 Thread,也可以是 Future 定义出的道路,我们只能脚踏实地的通过道路走到目标终点,而不能直接跳到终点。Promise 类似于一个 milestone 点,如果只有一个 Promise,不定义出“道路”也就是不定义出一个 Future(或 Main Thread)的话,是无法实现这个 Promise 的。只定义了 Promise,不去考虑直线路径(Future),无法实现,但只定义 Future,不定义 Promise(其实在 Future 中是内置了 Default 的 Promise 的)是可以直接执行 Future 的。如下图所示,开出了两张空头支票,没有定义具体的路线(Future 实现方式),那么这两个 Promise 是无法兑现的。

    • 需要注意一点,这张图只是画出了定义了 Promise,但是如果想对象这个 Promise 的话,是可以通过 Promise 中的方法来搭建出一个 Future 来执行的,与 Future 不同的是,Future 只要定义了就可以马上执行,Promise 定义了的话,必须要显式的触发“搭建 Future”的操作才可以。


    • 看看 Promise 不执行的代码

    • 这里,我们定义了一个 Promise,并且“承诺”在 Promise 对应的 future 结束后调用一个map操作打印出一句话 future:....

    • 但我们执行以下语句的时候会发现什么都没有执行

        import scala.concurrent.Promise    import scala.util.{Failure, Success}
    object PromiseTest { def main(args: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val promise = Promise[String] promise.future.onComplete(v => println("onComplete " + v)) promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName)) promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName)) Thread.sleep(3000) }
    }
    复制代码


    • 看看可以执行的代码

    • 与上面代码唯一不同的就是,加入了promise.trySuccess的处理

    • 代码细节在其他篇章我们具体看,这里你可以这样理解,加入和 trySuccess,就是为达到 Promise 搭建了一条 Future 之路,并触发这条路开始执行(start)

    • 至于 trySuccess,tryComplete 等具体的细节讲 scala 多线程的地方可以细说

    • promise.future.onComplete

    • 在 Future 执行完毕后的 callback 处理,无论是 Success 还是 Failure 都可以执行这个onComplete处理

    • promise.future.map

    • promise.future.onComplete之后对 Future 进行的继续的 map 处理

    • promise.trySuccess

    • 触发整个 Future 执行的 trigger


        import scala.concurrent.Promise    import scala.util.{Failure, Success}
    object PromiseTest { def main(args: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val promise = Promise[String] promise.future.onComplete(v => println("onComplete " + v)) promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName)) promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName)) **promise.trySuccess("try success " + " --> " + Thread.currentThread().getName)** Thread.sleep(3000) }
    }
    复制代码

    ask 的代码

    其实,讲完了上面的所有的内容后,ask 的代码感觉几句话就可以讲解完毕了。


    ask 本身返回的是 Future,本身是异步处理


    • 【图 9】图解一下

    • 一台 10.1.1.1 的 client 机器通过 rpc 访问一台 10.1.1.2 的 Netty 的 service,当 response 正确返回后,在 client 机器中的TransportResponseHandler中进行判断处理,并且调用 listener 的 onSuccess 方法,这个 onSuccess 方法则是下面的 ask 代码中定义的方法。在这个方法中本身又去执行了 promise 的 tryComplete,从而触发了 promise 的 future 之路执行


    private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {    // 定义了一个Any的promise    val promise = Promise[Any]()    val remoteAddr = message.receiver.address
    def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } } }
    /* 这里声明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个 RpcResponseCallback就是上面【图9】中的listener,当我们从Server端获取到response后 注意,获取的不是RpcFailure类型的response,则都会进入到【图9】的 else if (message instanceof RpcResponse) { 分支中
    */ def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => /* 当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也 进行call操作,这里就是上面所说的,为了一个promise铺设了一条future,从而可以执行 这个Future的线程了 */ if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") } }
    try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => **onSuccess**(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) /* 如果是callback了Failure,则这里会被执行 */ promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) }
    val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
    /* 当promise的future执行后,会调用这里的onComplete方法 */ promise.future.onComplete { v => timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) }
    /* 利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容 如果是RpcTimeoutException 则 直接throw这个ex 如果是TimeoutException 则包装成RpcTimeoutException后再throw出去 */ promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }
    复制代码

    总结

    本篇用小篇幅讲解了一下*o.a.s.rpc.netty.NettyRpcEnv.ask()*的方法,简单描述了一个 spark 的异步处理的小 case,这个小 case 需要不少的先验知识点,可能突然间看到这里有点懵,学习需要融会贯通一点点的来积累才可以,如果不明白可以慢慢积累其他模块的知识再来这里看流水账会更有收获。

    发布于: 2021 年 07 月 17 日阅读数: 14
    用户头像

    dclar

    关注

    代码是有生命的艺术品 2020.05.29 加入

    还未添加个人简介

    评论

    发布
    暂无评论
    Spark源代码::Spark多线程::NettyRpcEnv.ask解读