写点什么

低代码平台中的分布式 RPC 框架 (约 3000 行代码)

作者:canonical
  • 2023-05-23
    北京
  • 本文字数:8021 字

    阅读完需:约 26 分钟

RPC 是分布式系统设计中不可或缺的一个部分。国内开源的 RPC 框架很多,它们的设计大都受到了 dubbo 框架的影响,核心的抽象概念与 dubbo 类似。从今天的角度上看,dubbo 的设计已经过于繁琐冗长,如果基于现在的技术环境,重新审视 RPC 框架的定位和设计,我们可以得到更简单、扩展性更好的实现方案。本文将介绍 Nop 平台中的 NopRPC 框架的设计思想和具体实现,它充分利用了成熟的技术设施,如 IoC 容器、JSON 序列化、GraphQL 引擎、Nacos 注册中心、Sentinel 熔断限流器等,通过大概 3000 行代码即可实现一个具备实用价值的分布式 RPC 框架。NopRPC 具有如下特点:


  1. 可以将 NopGraphQL 服务封装为普通的强类型的 RPC 接口,同时保留 GraphQL 的响应字段选择能力

  2. 可以将任意支持单向信息发送和接收的消息接口封装为等待响应消息的 RPC 接口

  3. Http、Socket、WebSocket、消息队列、批处理文件等都只是一种接口形式,通过配置可以将同一个服务实现适配到多种不同的接口形式

  4. 支持取消正在执行的 RPC 调用,取消时可以调用远程服务上的 cancelMethod

  5. 支持将配对的 startTask 和 checkTaskStatus 两个调用封装为一个异步 RPC 接口

  6. 支持灰度发布。可以在网关处设置路由选择 header,直接控制后续调用链路中的服务路由。

  7. 支持广播式调用、选主调用(只调用选举得到的主服务器),以及指定服务器调用(直接指定被调用服务的地址和端口)

  8. 利用 NopTcc 引擎实现分布式事务

  9. 利用 NopTask 引擎实现服务端的低代码模型驱动开发

  10. 支持端到端的 RPC 超时控制

  11. 支持国际化多语言消息

  12. 支持错误码映射(例如将多个内部错误码统一映射为同一个外部错误码或者将同一个错误码根据错误参数不同映射为不同的外部错误码和错误消息)

  13. 支持云原生的服务网格

  14. 支持 GraalVM 原生应用编译


NopRPC 的具体使用文档参见rpc.md

一. 请求和响应消息设计

RPC 的核心功能是发送请求消息和接收响应消息,所以请求消息和响应消息的结构是 RPC 中的一项关键性设计。NopRPC 框架中的消息结构定义如下:


class ApiRequest<T>{    Map<String,Object> headers;    T data;    FieldSelectionBean selection;    Map<String,Object> properties;}
class ApiResponse<T>{ Map<String,Object> headers; int status; String code; String msg; T data;}
复制代码


  1. 根据可逆计算原理,Nop 平台中所有的核心数据结构都要采取(data, metadata)这种配对的结构设计,在 metadata 中可以存放 data 之外的扩展数据,因此我们为消息对象增加了 headers 字段,在传输时可以映射到底层信道支持的 headers 字段,例如通过 http 传输时 headers 对应于 http 的 headers,而通过 Kafka 消息队列传输时,对应于 Kafka 消息的 headers。

  2. GraphQL 的一个特别之处是支持调用端选择返回哪些数据,这样可以减少返回的数据量并可以优化服务端数据处理过程。ApiRequest 增加的 selection 字段将这种能力扩展到了所有 RPC 调用过程

  3. headers 是与 data 一起被发送到远端的扩展数据,除了这些扩展数据之外,我们经常还需要使用一些仅存在于当前处理过程中的扩展对象,例如 responseNormalizer 等,因此 ApiRequest 中定义了一个 properties 扩展数据集合,它不支持 json 序列化,可以用于存放那些不需要被发送到远端的临时数据。

  4. NopRPC 对错误码和错误消息的处理进行了统一规范化,前端页面的 Ajax 请求直接使用 ApiResponse 返回消息格式,统一了 RPC 和 Web 请求的输入输出规范。具体错误码规范参见error-code.md

  5. 为了支持可以通过命令行调用 RPC 服务,ApiResponse 通过整数类型的 status 字段来表示是否调用成功。如果调用成功,则返回 0。作为命令行程序被调用时,status 可以直接被映射为命令行的返回值。


一般的 RPC 框架中,Request 和 Response 消息往往会包含大量实现细节,导致它们仅限于在框架实现层面作为内部类来使用,而 NopRPC 的设计则是将 ApiRequest 和 ApiResponse 通用化,在所有需要传输消息、返回信息的地方都采用统一的消息结构,实现了 RPC、Web 框架、消息队列、批处理服务、命令行应用等一系列接口的无缝对接。

二. RPC 的解构

NopRPC 的核心接口是 IRpcService


interface IRpcService{   CompletionStage<ApiResponse<?>> callAsync(String serviceMethod,            ApiRequest<?> request, ICancelToken cancelToken);}
interface ICancelToken{ boolean isCancelled(); String getCancelReason(); void appendOnCancel(Consumer<String> task);}
复制代码


通过以上接口定义,我们可以获知如下信息:


  1. NopRPC 是一个面向异步处理的框架,而且它支持取消机制

  2. ApiRequest 和 ApiResponse 都是 POJO 对象,框架本身没有任何运行时假定,因此可以脱离 Web 环境以及 Socket 环境来使用。


现在有些 RPC 框架采用 ReactiveStream 的设计,每个 RPC 请求可能产生多个响应消息,并且支持通过 RPC 来下载大附件。但是在 Nop 平台中,RPC 的定位就是实现应用之间一对一的信息交换:每发送一个请求消息,接收且必然接收到唯一的一个响应消息。因为按照 ReactiveStream 方式来实现 RPC 会导致服务端和客户端的管理控制变得更加复杂,而在实际使用过程中大部分情况下却用不到多个返回消息的情况。另外,在消息系统的抽象中,本身就提供了发送和接收消息流的功能,如果通过 RPC 再次暴露类似的功能就显得有些多余。至于大文件的上传下载现在一般都是封装为单独的文件服务,可以专门定义针对云存储优化的接口,也没有必要在 RPC 框架中再提供类似的功能。

2.1 RPC over GraphQL

一般的 RPC 服务端都是根据消息类型直接映射到某个服务方法,然后所有业务处理都在这个消息服务函数中执行。但是在 Nop 平台中,RPC 调用在服务端会将消息投递给 NopGraphQL 引擎,然后由 GraphQLExecutor 负责协调组织多个 DataLoader 协同工作。例如在服务端我们实现了如下 BizModel


@BizModel("MyEntity")public class MyEntityBizModel{    @BizQuery    public List<MyEntity> findList(@RequestBean MyRequestBean request,                          FieldSelectionBean selection){        ....    }
@BizLoader("children") @GraphQLReturn(bizObjName = "MyEntity") public List<MyEntity> loadChildren(@ContextSource MyEntity entity) { ... return children; }}
class MyEntity{ private String name; private List<MyEntity> children;
public String getName(){ return name; }
@LazyLoad public List<MyEntity> getChildren(){ return children; }}
复制代码


以上代码中,我们在服务端定义了一个业务对象 MyEntity,并对外提供了一个查询服务函数MyEntity__findList,这个查询函数会返回 MyEntity 对象列表, MyEntity 对象上的 children 属性为延迟加载属性,除非明确指定,否则它并不会自动返回给前端。而加载 children 属性时会触发 MyEntityBizModel 上的 loadChildren 函数。借助于这种设计,我们可以将一个庞大复杂的领域对象模型暴露为统一的 RPC 接口服务,而不用担心无关信息过多影响性能。在客户端我们可以使用如下接口来调用:


@BizModel("MyEntity")interface MyEntityService{   @BizQuery   CompletionStage<ApiResponse<List<MyEntity>>> findListAsync(ApiRequest<MyRequestBean> request, ICancelToken cancelToken);
@BizQuery List<MyEntity> findList(@RequestBean MyRequestBean request);}
复制代码


多个 Java 方法可以映射到同一个后台服务调用,可以同时支持同步调用和异步调用。一般情况下约定异步调用方法名的后缀为 Async,返回类型为 CompletionStage。如果不需要设置 selection 和 headers,我们也可以使用普通 java 对象作为输入参数, 返回普通的 Java 对象,出错时会自动将 ApiResponse 中的错误码和错误消息包装为 NopRebuildException 对外抛出。


Java 接口通过 Aop Proxy 会被转换为对 IRpcService 接口的调用,以上方法调用将会被转换为


  rpcService.callAsync("MyEntity__findList", apiRequest, cancelToken)
复制代码


对应于前端的 REST 请求格式如下:


POST /r/MyEntity__findList?@selection=a,b,children{a,b}{    json body }
复制代码


通过内置的 @selection 参数可以为 REST 请求补充响应字段选择能力


NopGraphQL 引擎本质上采用的是框架中立的设计,相当于是针对 POJO 的 Request 对象的一种纯逻辑处理函数,没有任何特定的运行时依赖,因此包装为 RPC 接口之后可以适配到各类输入输出信道上。例如作为批处理文件的处理函数,通过配置读取批处理文件的每一行,构造为 ApiRequest 对象后调用对应的 GraphQL 服务。


关于 NopGraphQL 引擎更进一步的介绍可以参见graphql-java.md

2.2 RPC over Message Queue

很多 RPC 框架在实现层面都会引入大量内部接口,这些接口仅对该 RPC 框架有意义,无法在 RPC 框架之外作为一般应用接口来使用。NopRPC 非常强调概念层面的抽象性和通用性,提供了MessageRpcClientMessageRpcServer等缺省实现,可以在任意消息队列的基础上建立 RPC 调用机制。消息队列在 Nop 平台中的定位就是单向信息发送,它的核心抽象接口如下:


interface IMessageService{     CompletionStage<Void> sendAsync(String topic, Object message,                     MessageSendOptions options);
/** * 响应消息发送到一个相关的topic上 * * @param topic 请求消息所属的topic * @return reply消息所对应的队列 */ default String getReplyTopic(String topic) { return "reply-" + topic; }
IMessageSubscription subscribe(String topic, IMessageConsumer listener, MessageSubscribeOptions options);}
复制代码


基于消息队列实现 RPC 客户端的做法如下:


  1. 在 ApiRequest 的 header 中增加 nop-id 唯一标识,设置 nop-svc-action 为服务方法标识

  2. 在实际发送前将消息注册到 waiting 队列中

  3. 发送消息到 topic 并监听 reply topic

  4. 从 reply topic 接收到响应或者超时发生时从 waiting 队列中取到 CompletableFuture 对象,并设置返回结果。


服务端实现比较简单:


  1. 监听 topic 并对接收到的 ApiRequest 消息调用本地的 IRpcService 服务实现

  2. 对 rpcService 返回的 ApiReponse 对象,设置 nop-rel-id 这个 header 的值为 ApiRequest 中的 nop-id。

  3. 将 ApiResponse 发送到 reply topic。


这个消息队列的实现非常通用。比如说nop-rpc-simple模块将 Socket 信道抽象为 IMessageService 服务,实现了基于 TCP 进行简单 RPC 调用的机制。此外我们还可以基于 Kafka 或者 Pulsar 等消息队列实现 RPC,或者利用 Redis 的 PUB/SUB 机制来实现。


再次强调一下IMessageService是 Nop 平台在应用层提供的对消息服务的统一抽象接口,并不是专为 RPC 内部实现所设计的一种专用接口


NopRPC 这一双向信息交互抽象可以建筑在单向的消息流抽象之上,有趣的是,我们也可以反过来,基于 IRpcService 抽象来提供 IMessageService 接口的实现。具体参见 RpcMessageSender,javaRpcMessageSubscriber.java。这种我中有你,你中有我的现象在数学推理中非常常见,它体现出 IRpcService 和 IMessageService 都是某种类似数学概念的通用抽象。


基于这种通用抽象,NopRPC 的实现非常简洁、通用,而很多 RPC 框架的实现都和底层的 Netty 交换信道深度绑定,无法轻易应用到新的交换信道上。

二. 负载均衡设计

分布式 RPC 最核心的价值就在于它提供了自定义的客户端负载均衡机制,从而可以利用集群冗余来扩展系统吞吐能力。分布式 RPC 的其他部分主要是为了运行负载均衡算法所做的准备工作。


NopRPC 客户端执行逻辑的伪代码如下:


// 利用服务发现机制获取到所有可用的服务实例List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
List<ServiceInstance> filtered = new ArrayList<>(instances);// 过滤掉所有不符合匹配条件的服务实例for(IRequestServiceInstanceFilter filter: filters){ // 先按照比较严格的规则进行过滤,比如只选择在同一个zone中的服务器 filter.filter(filtered, request, false);}
// 如果没有匹配的服务实例,则尝试按照比较宽松的规则进行过滤if(filtered.isEmpty()){ filtered = new ArrayList<>(instances); for(IRequestServiceInstanceFilter filter: filters){ filter.filter(filtered, request, true); }}
// 利用负载均衡算法从所有可选的服务实例中随机选择一个ServiceInstance selected = loadBalance.choose(filtered,request);IRpcService rpcService = rpcClientInstanceProvider.getRpcClientInstance(selected);CompletionStage<ApiResponse> response = rpcService.callAsync( serviceMethod, request, cancelToken);
复制代码


本质上是先执行路由过滤逻辑,只保留符合匹配条件的路由条目,然后再执行负载均衡算法实现最终选择。

失败重试

如果配置了 nop.rpc.cluster-client-retry-count(缺省值是 2),则当连接服务端失败时会自动从备选列表中删除连接失败的服务器,然后重新运行负载均衡算法选择一个实例,重新建立连接。


目前只有连接失败才会重试(抛出 NopConnectException),其他失败情况并不会导致重试。


对应的伪代码为:


 Exception error = null; for (int i = 0; i <= retryCount; i++) {     ServiceInstance instance = loadBalance.choose(instances, request);     try {         return getRpcClient(instance, request).call(serviceMethod, request, cancelToken);     } catch (Exception e) {         error = e;
if (!isAllowRetry(e)) { break; }
if (instances.size() > 1) { // 删除刚才出错的连接,然后重试 instances.remove(instance); } } } throw NopException.adapt(error);
复制代码

灰度发布

灰度发布可以看作是一种路由逻辑,即满足某些条件的请求只会路由到指定的服务实例。在 NopRPC 中我们可以利用TagServiceInstanceFilterRouteServiceInstanceFilter来实现灰度发布。


  • 如果 ApiRequest 中包含 nop-tags 这个 header,则只会选择具有指定标签的 ServiceInstance。例如nop-tags=a,b,则要求 ServiceInstance 必须同时具有这两个标签。

  • 通过 nop-svc-route 这个 header 可以直接指定服务版本,例如 nop-svc-route=ServiceA:1.0.0,ServiceB:^2.0.3 表示对服务 A 使用版本 1.0.0,而对于 ServiceB,则使用 2.0.3 以上的版本。nop-svc-route 的格式为服务名:NPM版本定义,服务名:NPM版本定义,版本定义采用 NPM 包的语义版本号规则。

三. 取消执行和状态轮询

NopRPC 的设计并没有选择利用 CompletableFuture 对象上的 cancel 方法,因为在实践中,通过参数传递 cancelToken 要比返回具有 cancel 函数的 Future 对象要容易处理得多,也更容易实现性能优化。


在执行取消操作时,一般的 RPC 框架只是会中断当前的请求连接,并不会向服务端主动发送取消消息。而在 NopRPC 中,可以通过如下配置,表示当执行取消操作时主动执行服务端的一个取消方法


@BizModel("MyEntity")interface MyEntityService{    @RpcMethod(cancelMethod="Sys__cancel”)    CompletionStage<ApiResponse<MyResponseBean> myAction(ApiRequest<MyRequestBean> request, ICancelToken cancelToken);}
复制代码


@RpcMethod(cancelMethod="Sys__cancel")注解表示当执行取消操作时会主动调用服务端 Sys 对象上的 cancel 方法,它是一个系统缺省的取消方法,会在服务端调用 cancelToken 的 cancel 操作。如果我们需要在 cancel 时执行一些业务相关的代码,则可以在服务端的 MyEntityBizModel 上实现 cancel 方法,然后使用@RpcMethod(cancelMethod="cancel")


如果 cancelMethod 中没有包含对象名,则表示调用当前业务对象上的方法。


具体 cancelMethod 的调用逻辑可以参见CancellableRpcClient.java


除了 cancelMethod 之外,RpcMethod 注解还支持配置 pollingMethod。


interface MyEntityService{   @RpcMethod(pollingMethod="checkTaskStatus")   CompletionStage<ApiResponse<TaskResultBean>> startTask(          ApiRequest<StartTaskRequestBean> request);}
复制代码


如果配置了 pollingMethod,则当执行 RPC 方法之后不会立刻返回,而是不断调用 pollingMethod 对应的远程服务,直到返回结果信息。


具体 pollingMethod 的处理逻辑参见 PollingRpcClient.java

四. 上下文传播

在微服务架构下,一次业务操作可能会产生多个相关联的 RPC 调用,必须要建立一种自动的上下文传播机制,将一些共享的信息从上游的服务传播到下游的服务。在 NopRPC 的具体实现中ContextBinder负责将 ApiRequest 的部分 header 信息复制到异步上下文对象 IContext 上,而ClientContextRpcServiceInterceptor负责将 IContext 上的信息传播到下游的 ApiRequest 的 headers 中。


缺省情况下,以下 header 会自动跨系统传播


端到端的超时控制

NopRPC 的 nop-timeout 消息头表示的是整个 RPC 调用的超时时间,因此当它传播到下一个 RPC 调用时会减去当前已经消耗的时间。例如 服务 A 接收到 nop-timeout=1000,然后经过 200ms 的处理之后调用下一个 RPC 服务,传给下游 RPC 的 nop-timeout=800。在服务内部,所有耗时的操作(例如数据库查询)都会检查 IContext.getCallExpireTime()是否超过当前时间。如果已超时,则直接中断,不再继续执行。通过这种方式可以减轻在系统繁忙的时候因为客户不断重试所导致的系统压力。


例如下游的服务 B 还在执行的过程中服务 A 认为它已经超时了,可能会发起重试,如果服务 B 没有发现自己已超时,还在继续执行未完成的操作,则可能会同时执行两个业务操作,导致系统的压力倍增。

五. 模型驱动开发

在 Nop 平台中我们提供了一个 API 模型,可以在 Excel 中定义系统对外暴露哪些服务,这些服务的请求和响应消息是什么。具体实例参见nop-wf.api.xlsx



在 RPC 的实现层面,我们也可以直接生成对TaskFlow或者Workflow模型的调用,通过可视化编配来实现业务功能。

六. 关于 Dubbo 的设计

Dubbo 框架中大量代码实现的都是辅助性的工作,从今天的角度看已经过时。


  1. SPI 插件加载机制。本质上相当于一个不太完善的 Bean 加载和装配引擎,可以直接用 IoC 容器来替换

  2. 序列化机制。在 REST 的场景下可以使用通用的 JSON 序列化来实现,而在二进制情况下也可以直接使用现成的 protostuff 等编码包

  3. 消息传输信道。可以直接使用 JDK 内置的 Httpclient, 也可以直接使用 IMessageService 消息队列抽象。

  4. Proxy 代理接口。本质上是为了实现从强类型的 Java 对象到通用的 IRpcService 之间的双向转换,直接提供一个 IRpcMessageTransformer 接口即可隔离各种转换策略。

  5. 服务注册发现。直接使用 Nacos 等专用的服务注册发现机制即可,不需要再对 Zookeeper 进行封装。


Dubbo 框架中内部接口的设计也不尽合理,例如负载均衡算法接口


interface interface LoadBalance {    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url,         Invocation invocation) throws RpcException;}
复制代码


这个设计存在如下问题:


  • Invoker 导致 Loadblance 算法和 RPC 的执行器产生不必要的依赖

  • Invocation 导致和 AOP 包装过程产生不必要的依赖

  • URL 是 Dubbo 引入的一个自定义的数据结构(与 JDK 内置的 URL 并不相同),相比于普通的 JSON 对象没有任何优势。


而在 NopRPC 中,负载均衡接口定义如下:


interface ILoadBalance<T,R>{     T choose(List<T> candidates, R request);}
复制代码


如果需要从 candidate 对象上读取权重配置等信息,可以实用 Adapter 模式


public interface ILoadBalanceAdapter<T> {    int getWeight(T candidate);
int getActiveCount(T candidate);}
复制代码


通过这种抽象,负载均衡算法成为纯粹的逻辑函数,与 RPC 的执行逻辑完全解耦,可以应用到所有需要负载均衡算法的地方,而不仅仅限于 RPC 调用场景。

总结

NopRPC 是从第一性原理出发,重新审视 RPC 的概念,完全重新设计的 Yet Another PRC 框架,它的设计非常简洁直观,易于扩展,是 Nop 平台的一个有机组成部分。


基于可逆计算理论设计的低代码平台 NopPlatform 已开源:



发布于: 2023-05-23阅读数: 19
用户头像

canonical

关注

还未添加个人签名 2022-08-30 加入

还未添加个人简介

评论

发布
暂无评论
低代码平台中的分布式RPC框架(约3000行代码)_开源_canonical_InfoQ写作社区