写点什么

一次 RPC 请求过程

作者:1412
  • 2023-10-25
    北京
  • 本文字数:6255 字

    阅读完需:约 21 分钟

一次RPC请求过程

最近给 SRPC 项目写几篇学习文章,希望协助小伙伴通过这个轻量级的框架快速了解 RPC 相关内容。

本篇为第二篇,注重于解读一次 RPC 请求的过程,是最简单、最主干的部分,而里边每一个层级怎么做资源调度和复用都不会包括在内,因此有基础的小伙伴可以直接跳读源码解析。

1. RPC 概念简述

SRPC 项目地址:https://github.com/sogou/srpc


花一点点时间补充 RPC 的基本概念,其字面意思是 Remote Procedure Call,远程过程调用。也就是说:


  • 如果我们是客户端,通过 RPC 调用,把某些事情交给远程机器去做;

  • 如果我们是服务端,就是被调用,别人交一些事情让我做;


那么必然涉及到三个小问题:


  1. 请求是什么?

  2. 怎么指定调用哪个过程?

  3. 怎么给填好回复?


整个调用由客户端发起。服务端启动服务器之后,等待他人调用。


我们举个小例子,上述三个小问题可以这样:


  1. 请求是int aint b

  2. 指定调用对方的sum

  3. 回复是求和后的值int ret


一会儿用这个例子看看 RPC 框架的代码是怎么做的。

2. 协议与框架

我们常提到的 RPC 可能是一种框架:


用来帮我们做网络收发。


比如 SRPC 是个轻量级的 RPC 框架,还有大家熟知的 GRPC、Thrift 等。


也可能是一种协议


RPC 协议让不同语言、不同框架都可以互通。


个人理解协议的本质是为了生态服务的,RPC 承担的是衔接整个生态系统的桥梁。


两者关系:


以 SRPC 为例,支持多种协议,包括 SRPCThriftBRPCtRPC,另外还可以收发 HTTP 协议


我们给出一张 RPC 请求过程的图及其中涉及到的关键函数接口,然后正式开始下面的学习。



这里我们看到几个有意思的事情:


  1. 请求/回复,是对称的。


  • 对客户端 Client 来说,请求时发出 SRPCRequest,收到 SRPCResponse;

  • 对服务端 Server 来说,收到 SRPCRequest,回复时发出 SRPCResponse;


  1. 收/发,接口是对称的。


  • 发消息的接口都是 encode(),无论我要发的是 SRPCRequest 还是 SRPCResponse;

  • 收消息的接口都是 append(),无论我要收的是 SRPCRequest 还是 SRPCResponse;


  1. Client/Server,也是对称的。


  • Client 主动发出请求,然后回复时是被动调起 callback()的(哪怕我们用同步接口,那也是调用完代码再往下走);

  • Server 被动接收请求,然后回复是 process()处理完之后主动进行的。

3. 定义 RPC 接口

我们刚才三个小问题怎么定义呢?可以使用protobuf作为接口描述文件:


// [ MyService.proto ]syntax="proto2";
message Request { // request包含了a和b required int32 a = 1; required int32 b = 2;}; message Response { // response包含了ret required int32 ret = 1;};
service MyService { // 服务名,用来区分我们的服务 rpc Sum(Request) returns (Response); // 调用名字为Sum的函数};
复制代码


也可以配合 srpc 小工具的 api 命令,产生一个简单的 protobuf 描述文件,并进行修改。命令参考如下:


./srpc api MyService
复制代码


然后就可以根据提示,打开 MyService.proto 并编辑其中的接口定义。

4. step-0 : client 发出请求

如总图的 step-0,我们想要发出请求,就需要调用上述定义的 RPC 接口Sum( )


// [ client_main.cc ]int main(){                                                        MyService::SRPCClient client("127.0.0.1", 1412);                                                                                                                                                                                                                                     Request req;   // 准备好Request    Response resp; // 准备好Response    RPCSyncContext ctx; // 一些必要的请求上下文,包括调用状态码
req.set_a(1); // 填a req.set_b(2); // 填b client.Sum(&req, &resp, &ctx); // 调用Sum() ...}
复制代码


当然想要框架知道怎么从上述的 protobuf 文件进行调用,我们需要一些代码生成工作。这不是本篇的重点,因此这里仅列出一些命令供大家运行起来。


我们根据刚才 srpc 小工具的示例,通过改好的 proto 文件把项目生成出来:


./srpc rpc my_rpc_project -f MyService.proto -p ./
复制代码


我们打开生成代码MyService.srpc.h,可以看到刚才调用的Sum()函数的异步接口和同步接口,定义如下:


// [ MyService.srpc.h ]class SRPCClient : public srpc::SRPCClient                                         {                                                                                  public:    void Sum(const Request *req, SumDone done);    void Sum(const Request *req, Response *resp, srpc::RPCSyncContext *sync_ctx);    ...};
复制代码

5. step-1:框架为 Client 发出请求

以上,我们作为 RPC 的用户,代码就告一段落了。


接下来交给 RPC 框架干活,它要做的事情包括但不仅限于:


把这个请求内容、以及用户要调用哪个服务(service)的哪个函数(method)告诉远程,并通过网络发送出去。


我们想要了解一个框架如何工作时,首先要了解它是基于什么构建起来的,包括什么语言什么底层网络收发库等。


SRPC 是基于 Workflow 的任务流编程范式开发的,并使用了其携带的网络收发功能,因此我们可以不用手写 I/O 多路复用等事情,但是开发需要遵循 Workflow 的编程规范,即:任务流


我们可以认为对于网络任务来说,一次会话就是一个 task,对于 client 我们的 task 职责就在于把 Request 发给对方收回 Response


继续围观生成代码 MyService.srpc.h,我们看一下最简单的异步接口实现是什么:


// [ MyService.srpc.h ]inline void SRPCClient::Sum(const Request *req, SumDone done)                   {                                                                                   auto *task = this->create_rpc_client_task("Sum", std::move(done));                                                                                              task->serialize_input(req);                                                     task->start();                                                              } 
复制代码


内部会构造出一个RPCClientTask,它被task->start();之后,就可以认为请求交给框架,用户态无需再关心,直到回复时框架通过回调等机制叫醒用户代码。


由于RPCClientTask的定义比较复杂,我们挑重点看:


// [ rc/rpc_task.inl ]// 1. 它派生于Workflow的WFComplexClientTask,//    以REQ,RESP为模版,定义了内部请求与回复的格式,//    我们这里分别是SRPCRequest和SRPCResponse。template<class RPCREQ, class RPCRESP>                                              class RPCClientTask : public WFComplexClientTask<RPCREQ, RPCRESP>{    ...protected:    // 2. SPRC框架重新实现了父类的方法message_out(),    //    用来告诉Workflow网络层面这次发出的请求内容时啥    CommMessageOut *message_out() override;
// 3. 保存了一个rpc_callback, 让网络回复了之后通知SRPC框架 // SRPC框架再去做网络请求到用户Response的格式转换 void rpc_callback(WFNetworkTask<RPCREQ, RPCRESP> *task);};
复制代码


上述的RPCREQ就是我们发出的请求,SRPCRequestSRPCResponse都从SRPCMessage派生:


// [ src/message/rpc_message_srpc.h ]class SRPCRequest : public SRPCMessage{    ...};
class SRPCResponse : public SRPCMessage{ ...};
复制代码


那么谁定义了 SRPCMessage的内存结构呢?就是SRPC协议。下图可以清晰地看到,我们在 SRPC 协议头部就有 meta 部分,上述提到的servicemethod就是填在里边。而后面的 message 就是我们的 Request。


我们把消息按照上述结构,通过SRPCMessage::encode()接口填好。这是 Workflow 的接口,它会在进行网络发送时entry->session->out->encode()被调用。


// [ src/message/rpc_message_srpc.h ]inline int SRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit){    // 这里用上了RPC协议,我们按照协议结构填内容。}
复制代码

6. step-2:与操作系统相关的网络操作

这部分在 Workflow 中实现,涉及到的网络基础知识很多,后续会针对性展开写学习心得,包括:


  • 命名服务

  • 目标选取

  • 负载均衡

  • 连接管理

  • IO 多路复用


等等,现在暂时跳过。

7. server 接收请求

我们切换一下视角,来到上述总图的右半边,server 要接收请求了。


当然 server 作为一个被动接收者,它需要先被用户启动起来。以下是用户代码:


// [ server_main.cc ]int main()                                                                         {    SRPCServer server; // 1. 构造一个server,负责网络请求
MyServiceServiceImpl impl; // 2. 构造一个服务,负责实现Sum server.add_service(&impl); // 3. 把服务实现加到server中 if (server.start(1412) == 0) // 4. 传入端口,把server跑起来 { printf("my_rpc_project SRPC server started, port %u\n", 1412); wait_group.wait(); // 5. server start也是异步的,暂时要卡住主线程不退出 server.stop(); } else perror("server start"); return 0; }
复制代码


然后就可以愉快地按照 SRPC 协议来接受请求了。


这是谁来做的呢?RPCServer 来做的。

8. step-4:框架为 server 接受请求

// [ src/rpc_server.h ]// 1. 从Workflow的WFServer派生//    由RPCTYPE::REQ和RPCTYPE::RESP来指定请求与回复的类型template<class RPCTYPE>class RPCServer : public WFServer<typename RPCTYPE::REQ,                                  typename RPCTYPE::RESP>{...protected:    //  2. 需要实现怎么构造一次会话,即RPCServerTask    CommSession *new_session(long long seq, CommConnection *conn) override;    // 3. 调用具体server接口的地方    void server_process(NETWORKTASK *task) const;    ...};
复制代码


我们的父类 WFServer 是可以帮我们按照某种协议收网络包的,只需要:


  1. 我们实现 new_session(), new 一个 RPCServerTask 给它;

  2. 在模版参中指定的 RPCTYPE::REQ 上实现 append()接口,指引 Workflow 网络层面如何从操作系统收到的数据上切一份完整的 REQ 下来。


其中第一步不是必须的,但我们 SRPC 框架需要,因为我们在本次会话有一些上下文要处理。但本文中我们只需关心 REQ。


这个 REQ 就是SRPCRequest,父类就是SRPCMessage,刚才也有提到过它的encode()实现,现在看看它的append()实现:


// [ src/message/rpc_message_srpc.cc ]int SRPCMessage::append(const void *buf, size_t *size, size_t size_limit){    ... // 把网络收到的一批buf,按照RPC协议保存到我的内存里,    ... // 并通过返回值告知核心我收发完没有,因为内部需要维护状态}
复制代码


Workflow 会不停调用这个append()来把 SRPC 协议图里的消息收完。


我们这里通过返回值来告知 Workflow 的网络层本条消息的接收情况:


  • 1:消息接受完成;

  • 0:未完成,继续收;

  • < 0:错误;


只要返回 1,流程就会继续往下走,也就是到了process()函数。

9. step-5:调用开发者的 rpc 函数

SRPC 框架收完消息之后,需要对 meta 进行一些处理:


  • 根据 meta 里的 service 去找到用户刚才 server.add_service(impl)时的那个 service;

  • 根据 meta 里的 method 去找用户的 impl 里实现的函数;


然后就可以调用 server 端开发者实现的 rpc 函数了。


查找过程很简单,以下是简化的流程:


// [ src/rpc_server.h ]template<class RPCTYPE>                                                            void RPCServer<RPCTYPE>::server_process(NETWORKTASK *task) const{    // 1. 把SRPC协议中的meta信息反序列化出来    req->deserialize_meta();     ...    // 2. 找service对象    auto *service = this->find_service(req->get_service_name());    ...    // 3. 找method对象    auto *rpc = service->find_method(req->get_method_name());    ...    // 4. 进一步处理    status_code = (*rpc)(server_task->worker);    ...}
复制代码


注意上述的进一步处理是因为,我们还需要对 body 进行反序列化:


// [ src/rpc_service.h ]template<class INPUT, class OUTPUT, class SERVICE>                                 static inline int                                                                  ServiceRPCCallImpl(SERVICE *service,                                                                  RPCWorker& worker,                                                                 void (SERVICE::*rpc)(INPUT *, OUTPUT *, RPCContext *))          {    // 1. new一片请求,是一开始定义的包含a和b的Request,它是个ProtobufMessage    auto *in = new INPUT;
// 2. 按照网络包里的body,从req反序列化处出来到in上 int status_code = worker.req->deserialize(in);
// 3. new一片回复,是一开始定义的包含ret的Response,它也是个ProtobufMessage auto *out = new OUTPUT;
// 4. 调用用户代码实现的rpc函数,进行计算 (service->*rpc)(in, out, worker.ctx);}
复制代码


之后就可以交给框架做回复返回的事情了。

10. 对称的回程

我们最后简单看一下用户代码里一般长啥样,也就是刚才 impl 里的 rpc 实现:


// [ server_main.cc ]class MyServiceServiceImpl : public MyService::Service                          {                                                                               public:    void Sum(Request *request, Response *response, srpc::RPCContext *ctx) override    {        // 这里是我们自己实现的加法        response->set_ret(request->a() + request->b());                                             }                                                                           }; 
复制代码


之后,用户无需进行任何代码编写,SRPC 和 Workflow 会进行 step-6 和 step-7,与先前的步骤类似且对称地,把回复填好并发出。


而 client 端又会先从 Workflow 和 SRPC 进行 step-8 和 step-9,同样与上述步骤类型且对称地,把回复收好,并调用到我们的 callback,或者在同步接口中(也就是文中的 Sum 调用示例)填好 Response,此次请求就完整结束了。


// [ client_main.cc ]int main(){    ...     client.Sum(&req, &resp, &ctx);                                                                 if (ctx.success)                                                                       fprintf(stderr, "ret = %d\n", resp.ret());                else                                                                                   fprintf(stderr, "sync status[%d] error[%d] errmsg:%s\n", ctx.status_code, ctx.error, ctx.errmsg.c_str()); 
return 0;}
复制代码


附上从调用模块角度来看的 one round 图:



更多内容参考:https://github.com/sogou/srpc/blob/master/docs/wiki.md

发布于: 15 小时前阅读数: 14
用户头像

1412

关注

鶸鶸的架构师 2018-08-07 加入

专注于异步调度框架开发和分布式存储技术,开源框架Workflow和srpc的作者之一。

评论

发布
暂无评论
一次RPC请求过程_c++_1412_InfoQ写作社区