写点什么

Tars Java 客户端源码分析

发布于: 2021 年 05 月 08 日

一、基本 RPC 框架简介


在分布式计算中,远程过程调用(Remote Procedure Call,缩写 RPC)允许运行于一台计算机的程序调用另一个地址空间计算机的程序,就像调用本地程序一样,无需额外地为这个交互作用涉及到的代理对象构建、网络协议等进行编程。


一般 RPC 架构,有至少三种结构,分别为注册中心,服务提供者和服务消费者。如图 1.1 所示,注册中心提供注册服务和注册信息变更的通知服务,服务提供者运行在服务器来提供服务,服务消费者使用服务提供者的服务。


服务提供者(RPC Server),运行在服务端,提供服务接口定义与服务实现类,并对外暴露服务接口。注册中心(Registry),运行在服务端,负责记录服务提供者的服务对象,并提供远程服务信息的查询服务和变更通知服务。服务消费者(RPC Client),运行在客户端,通过远程代理对象调用远程服务。


(RPC 框架基本结构)


1.1 RPC 调用流程


如下图所示,描述了 RPC 的调用流程,其中 IDL(Interface Description Language)为接口描述语言,使得在不同平台上运行的程序和用不同语言编写的程序可以相互通信交流。


(RPC 调用流程)


1)客户端调用客户端桩模块。该调用是本地过程调用,其中参数以正常方式推入堆栈。

2)客户端桩模块将参数打包到消息中,并进行系统调用以发送消息。打包参数称为编组。

3)客户端的本地操作系统将消息从客户端计算机发送到服务器计算机。

4)服务器计算机上的本地操作系统将传入的数据包传递到服务器桩模块。

5)服务器桩模块从消息中解包出参数。解包参数称为解组。

6)最后,服务器桩模块执行服务器程序流程。回复是沿相反的方向执行相同的步骤。


二、Tars Java 客户端设计介绍


Tars Java 客户端整体设计与主流的 RPC 框架基本一致。我们先介绍 Tars Java 客户端初始化过程。


2.1 Tars Java 客户端初始化过程


如图 2.1 所示,描述了 Tars Java 的初始化过程。


(Tars Java 初始化过程)


1)先出创建一个 CommunicatorConfig 配置项,命名为 communicatorConfig,其中按需设置 locator, moduleName, connections 等参数。


2)通过上述的 CommunicatorConfig 配置项,命名为 config,那么调用 CommunicatorFactory.getInstance().getCommunicator(config),创建一个 Communicator 对象,命名为 communicator。


3)假设 objectName="MESSAGE.ControlCenter.Dispatcher",需要生成的代理接口为 Dispatcher.class,调用 communicator.stringToProxy(objectName, Dispatcher.class)方法来生成代理对象的实现类。


4)在 stringToProxy()方法里,首先通过初始化 QueryHelper 代理对象,调用 getServerNodes()方法获取远程服务对象列表,并设置该返回值到 communicatorConfig 的 objectName 字段里。具体的代理对象的代码分析,见下文中的“2.3 代理生成”章节。


5)判断在之前调用 stringToProxy 是否有设置 LoadBalance 参数,如果没有的话,就生成默认的采用 RR 轮训算法的 DefaultLoadBalance 对象。


6)创建 TarsProtocolInvoker 协议调用对象,其中过程有通过解析 communicatorConfig 中的 objectName 和 simpleObjectName 来获取 URL 列表,其中一个 URL 对应一个远程服务对象,TarsProtocolInvoker 初始化各个 URL 对应的 ServantClient 对象,其中一个 URL 根据 communicatorConfig 的 connections 配置项确认生成多少个 ServantClient 对象。然后使用 ServantClients 等参数初始化 TarsInvoker 对象,并将这些 TarsInvoker 对象集合设置到 TarsProtocolInvoker 的 allInvokers 成员变量中,其中每个 URL 对应一个 TarsInvoker 对象。上述分析表明,一个远程服务节点对应一个 TarsInvoker 对象,一个 TarsInvoker 对象包含 connections 个 ServantClient 对象,对于 TCP 协议,那么就是一个 ServantClient 对象对应一个 TCP 连接。


7)使用 api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator 参数生成一个实现 JDK 代理接口 InvocationHandler 的 ObjectProxy 对象。


8)生成 ObjectProxy 对象的同时进行初始化操作,首先会执行 loadBalancer.refresh()方法刷新远程服务节点到负载均衡器中便于后续 tars 远程调用进行路由。


9)然后注册统计信息上报器,其中是上报方法采用 JDK 的 ScheduledThreadPoolExecutor 进行定时轮训上报。


10)注册服务列表刷新器,采用的技术方法和上述统计信息上报器基本一致。


2.2 使用范例


以下代码为最简化示例,其中 CommunicatorConfig 里的配置采用默认值,communicator 通过 CommunicatorConfig 配置生成后,直接指定远程服务对象的具体服务对象名、IP 和端口生成一个远程服务代理对象。


Tars Java 代码使用范例// 先初始化基本 Tars 配置 CommunicatorConfig cfg = new CommunicatorConfig();// 通过上述的 CommunicatorConfig 配置生成一个 Communicator 对象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);// 指定 Tars 远程服务的服务对象名、IP 和端口生成一个远程服务代理对象。

// 先初始化基本Tars配置    CommunicatorConfig cfg = new CommunicatorConfig();    // 通过上述的CommunicatorConfig配置生成一个Communicator对象。    Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);    // 指定Tars远程服务的服务对象名、IP和端口生成一个远程服务代理对象。    HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");    //同步调用,阻塞直到远程服务对象的方法返回结果    String ret = proxy.hello(3000, "Hello World");    System.out.println(ret);    //异步调用,不关注异步调用最终的情况    proxy.async_hello(null, 3000, "Hello World");      //异步调用,注册一个实现TarsAbstractCallback接口的回执处理对象,该实现类分别处理调用成功,调用超时和调用异常的情况。    proxy.async_hello(new HelloPrxCallback() {        @Override        public void callback_expired() { //超时事件处理        }        @Override        public void callback_exception(Throwable ex) { //异常事件处理        }        @Override        public void callback_hello(String ret) { //调用成功事件处理            Main.logger.info("invoke async method successfully {}", ret);       }    }, 1000, "Hello World");
复制代码

在上述例子中,演示了常见的两种调用方式,分别为同步调用和异步调用。其中异步调用,如果调用方想捕捉异步调用的最终结果,可以注册一个实现 TarsAbstractCallback 接口的实现类,对 tars 调用的异常,超时和成功事件进行处理。


2.3 代理生成


Tars Java 的客户端桩模块的远程代理对象是采用 JDK 原生 Proxy 方法。如下文的源码所示,ObjectProxy 实现了 java.lang.reflect.InvocationHandler 的接口方法,该接口是 JDK 自带的代理接口。


代理实现

public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        String methodName = method.getName();        Class<?>[] parameterTypes = method.getParameterTypes();        InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);        try {            if ("toString".equals(methodName) && parameterTypes.length == 0) {                return this.toString();            } else if                //***** 省略代码 *****            } else {                // 在负载均衡器选取一个远程调用类,进行应用层协议的封装,最后调用TCP传输层进行发送。                Invoker invoker = this.loadBalancer.select(context);                return invoker.invoke(context);            }        } catch (Throwable var8) {            // ***** 省略代码 *****        }    }}
复制代码

当然生成上述远程服务代理类,涉及到辅助类,Tars Java 采用 ServantProxyFactory 来生成上述的 ObjectProxy,并存储 ObjectProxy 对象到 Map 结构,便于调用方二次使用时直接复用已存在的远程服务代理对象。


具体相关逻辑如源码所示,ObjectProxyFactory 是生成 ObjectProxy 的辅助工厂类,和 ServantProxyFactory 不同,其本身不缓存生成的代理对象。

class ServantProxyFactory {    private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();    // ***** 省略代码 *****    public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {        Object proxy = this.cache.get(objName);        if (proxy == null) {            this.lock.lock(); // 加锁,保证只生成一个远程服务代理对象。            try {                proxy = this.cache.get(objName);                if (proxy == null) {                    // 创建实现JDK的java.lang.reflect.InvocationHandler接口的对象                    ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);                    // 使用JDK的java.lang.reflect.Proxy来生成实际的代理对象                    this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));                    proxy = this.cache.get(objName);                }            } finally {                this.lock.unlock();            }        }        return proxy;    }    /** 使用JDK自带的Proxy.newProxyInstance生成代理对象 */    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);    }    // ***** 省略代码 *****}
复制代码

从以上的源码中,可以看到 createProxy 使用了 JDK 的 Proxy.newProxyInstance 方法来生成远程服务代理对象。


2.4 远程服务寻址方法


作为一个 RPC 远程框架,在分布式系统中,调用远程服务,涉及到如何路由的问题,也就是如何从多个远程服务节点中选择一个服务节点进行调用,当然 Tars Java 支持直连特定节点的方式调用远程服务,如上文的 2.2 使用范例所介绍。


如图下图所示,ClientA 某个时刻的一次调用使用了 Service3 节点进行远程服务调用,而 ClientB 某个时刻的一次调用采用 Service2 节点。Tars Java 提供多种负载均衡算法实现类,其中有采用 RR 轮训算法的 RoundRobinLoadBalance,一致性哈希算法的 ConsistentHashLoadBalance 和普通哈希算法的 HashLoadBalance。


(客户端按特定路由规则调用远程服务)


如下述源码所示,如果要自定义负载均衡器来定义远程调用的路由规则,那么需要实现 com.qq.tars.rpc.common.LoadBalance 接口,其中 LoadBalance.select()方法负责按照路由规则,选取对应的 Invoker 对象,然后进行远程调用,具体逻辑见源码代理实现。由于远程服务节点可能发生变更,比如上下线远程服务节点,需要刷新本地负载均衡器的路由信息,那么此信息更新的逻辑在 LoadBalance.refresh()方法里实现。


负载均衡接口

public interface LoadBalance<T> {    /** 根据负载均衡策略,挑选invoker */    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;    /** 通知invoker列表的更新 */    void refresh(Collection<Invoker<T>> invokers);}
复制代码


2.5 网络模型


Tars Java 的 IO 模式采用的 JDK 的 NIO 的 Selector 模式。这里以 TCP 协议来描述网络处理,如下述源码所示,Reactor 是一个线程,其中的 run()方法中,调用了 selector.select()方法,意思是如果除非此时网络产生一个事件,否则将一直线程阻塞下去。


假如此时出现一个网络事件,那么此时线程将会被唤醒,执行后续代码,其中一个代码是 dispatcheEvent(key),也就是将进行事件的分发。


其中将根据对应条件,调用 acceptor.handleConnectEvent(key)方法来处理客户端连接成功事件,


或 acceptor.handleAcceptEvent(key)方法来处理服务器接受连接成功事件,


或调用 acceptor.handleReadEvent(key)方法从 Socket 里读取数据,


或 acceptor.handleWriteEvent(key)方法来写数据到 Socket 。


Reactor 事件处理

public final class Reactor extends Thread {    protected volatile Selector selector = null;    private Acceptor acceptor = null;    //***** 省略代码 *****    public void run() {        try {            while (!Thread.interrupted()) {                // 阻塞直到有网络事件发生。                selector.select();                //***** 省略代码 *****                while (iter.hasNext()) {                    SelectionKey key = iter.next();                    iter.remove();                    if (!key.isValid()) continue;                    try {                        //***** 省略代码 *****                        // 分发传输层协议TCP或UDP网络事件                        dispatchEvent(key);                //***** 省略代码 *****            }        }        //***** 省略代码 *****    }        //***** 省略代码 *****    private void dispatchEvent(final SelectionKey key) throws IOException {        if (key.isConnectable()) {            acceptor.handleConnectEvent(key);        } else if (key.isAcceptable()) {            acceptor.handleAcceptEvent(key);        } else if (key.isReadable()) {            acceptor.handleReadEvent(key);        } else if (key.isValid() && key.isWritable()) {            acceptor.handleWriteEvent(key);        }    }}
复制代码

网络处理采用 Reactor 事件驱动模式,Tars 定义一个 Reactor 对象对应一个 Selector 对象,针对每个远程服务(整体服务集群,非单个节点程序)默认创建 2 个 Reactor 对象进行处理,通过修改com.qq.tars.net.client.selectorPoolSize 这个 JVM 启动参数值来决定一个远程服务具体创建几个 Reactor 对象。


(Tars-Java 的网络事件处理模型)


上图中的处理读 IO 事件(Read Event)实现和写 IO 事件(Write Event)的线程池是在 Communicator 初始化的时候配置的。具体逻辑如源码所示,其中线程池参数配置由 CommunicatorConfig 的 corePoolSize, maxPoolSize, keepAliveTime 等参数决定。


读写事件线程池初始化

private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {    //***** 省略代码 *****    this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);    //***** 省略代码 *****}public class ClientPoolManager {    public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {        //***** 省略代码 *****        clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));        //***** 省略代码 *****        return clientPoolExecutor;    }             private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {        int corePoolSize = communicatorConfig.getCorePoolSize();        int maxPoolSize = communicatorConfig.getMaxPoolSize();        int keepAliveTime = communicatorConfig.getKeepAliveTime();        int queueSize = communicatorConfig.getQueueSize();        TaskQueue taskqueue = new TaskQueue(queueSize);        String namePrefix = "tars-client-executor-";        TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));        taskqueue.setParent(executor);        return executor;    }}
复制代码


2.6 远程调用交互模型


调用代理类的方法,那么会进入实现 InvocationHandler 接口的 ObjectProxy 中的 invoke 方法。


下图描述了远程服务调用的流程情况。这里着重讲几个点,一个是如何写数据到网络 IO。第二个是 Tars Java 通过什么方式进行同步或者异步调用,底层采用了什么技术。


(远程调用流程)


2.6.1 写 IO 流程


如图(底层代码写 IO 过程)所示,ServantClient 将调用底层网络写操作,在 invokeWithSync 方法中,取得 ServantClient 自身成员变量 TCPSession,调用 TCPSession.write()方法,如图(底层代码写 IO 过程)和以下源码( 读写事件线程池初始化)所示,先获取 Encode 进行请求内容编码成 IoBuffer 对象,最后将 IoBuffer 的 java.nio.ByteBuffer 内容放入 TCPSession 的 queue 成员变量中,然后调用 key.selector().wakeup(),唤醒 Reactor 中 run()方法中的 Selector.select(),执行后续的写操作。


(底层代码写 IO 过程)


具体 Reactor 逻辑见上文 2.5 网络模型内容,如果 Reactor 检查条件发现可以写 IO 的话也就是 key.isWritable()为 true,那么最终会循环从 TCPSession.queue 中取出 ByteBuffer 对象,调用 SocketChannel.write(byteBuffer)执行实际的写网络 Socket 操作,代码逻辑见源码中的 doWrite()方法。


读写事件线程池初始化

public class TCPSession extends Session {    public void write(Request request) throws IOException {        try {            IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);            write(buffer);        //***** 省略代码 *****    }    protected void write(IoBuffer buffer) throws IOException {        //***** 省略代码 *****        if (!this.queue.offer(buffer.buf())) {            throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");        }        if (key != null) {            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);            key.selector().wakeup();        }    }    protected synchronized int doWrite() throws IOException {        int writeBytes = 0;        while (true) {            ByteBuffer wBuf = queue.peek();            //***** 省略代码 *****            int bytesWritten = ((SocketChannel) channel).write(wBuf);            //***** 省略代码 *****        return writeBytes;    }}
复制代码


2.6.2 同步和异步调用的底层技术实现


对于同步方法调用,如图(远程调用流程)和源码(ServantClient 的同步调用)所示,ServantClient 调用底层网络写操作,在 invokeWithSync 方法中创建一个 Ticket 对象,Ticket 顾名思义就是票的意思,这张票唯一标识本次网络调用情况。


ServantClient 的同步调用

public class ServantClient {    public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {            //***** 省略代码 *****            ticket = TicketManager.createTicket(request, session, this.syncTimeout);            Session current = session;            current.write(request);            if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {            //***** 省略代码 *****            response = ticket.response();            //***** 省略代码 *****            return response;            //***** 省略代码 *****        return response;    }}
复制代码

如代码所示,在执行完 session.write()操作后,紧接着执行 ticket.await()方法,该方法线程等待直到远程服务回复返回结果到客户端,ticket.await()被唤醒后,将执行后续操作,最终 invokeWithSync 方法返回 response 对象。其中 Ticket 的等待唤醒功能内部采用 java.util.concurrent.CountDownLatch 来实现。


对于异步方法调用,将会执行 ServantClient.invokeWithAsync 方法,也会创建一个 Ticket,并且执行 Session.write()操作,虽然不会调用 ticket.await(),但是在 Reactor 接收到远程回复时,首先会先解析 Tars 协议头得到 Response 对象,然后将 Response 对象放入如图(Tars-Java 的网络事件处理模型)所示的 IO 读写线程池中进行进一步处理,如下述源码(异步回调事件处理)所示,最终会调用 WorkThread.run()方法,在 run()方法里执行 ticket.notifyResponse(resp),该方法里面会执行类似上述代码 2.1 中的实现 TarsAbstractCallback 接口的调用成功回调的方法。


异步回调事件处理

public final class WorkThread implements Runnable {    public void run() {        try {            //***** 省略代码 *****                Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());            //***** 省略代码 *****                ticket.notifyResponse(resp);                ticket.countDown();                TicketManager.removeTicket(ticket.getTicketNumber());            }            //***** 省略代码 *****    }}
复制代码

如下述源码所示,TicketManager 会有一个定时任务轮训检查所有的调用是否超时,如果(currentTime - t.startTime) > t.timeout 条件成立,那么会调用 t.expired()告知回调对象,本次调用超时。


调用超时事件处理

public class TicketManager {            //***** 省略代码 *****    static {        executor.scheduleAtFixedRate(new Runnable() {            long currentTime = -1;            public void run() {                Collection<Ticket<?>> values = tickets.values();                currentTime = System.currentTimeMillis();                for (Ticket<?> t : values) {                    if ((currentTime - t.startTime) > t.timeout) {                        removeTicket(t.getTicketNumber());                        t.expired();                    }                }            }        }, 500, 500, TimeUnit.MILLISECONDS);    }}
复制代码


三、总结


代码的调用一般都是层层递归调用,代码的调用深度和广度都很大,通过调试代码的方式一步步学习源码的方式,更加容易理解源码的含义和设计理念。


Tars 与其他 RPC 框架,并没有什么本质区别,通过类比其他框架的设计理念,可以更加深入理解 Tars Java 设计理念。


四、参考文献


1.Remote procedure call

2.Tars Java源码Github仓库

3.RPC框架简介与原理


作者:vivo 互联网服务器团队-Ke Shengkai

发布于: 2021 年 05 月 08 日阅读数: 63
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
Tars Java 客户端源码分析