写点什么

基于内存通信的 gRPC 调用

作者:Robert Lu
  • 2021 年 11 月 25 日
  • 本文字数:5357 字

    阅读完需:约 18 分钟

Apache Dubbo 有 injvm 方式的通信,能够避免网络带来的延迟,同时也不占用本地端口,对测试、本地验证而言,是一种比较方便的 RPC 通信方式。


最近看到 containerd 的代码,发现它也有类似的需求,那么就考察了下 gRPC 有没有类似的,基于内存的通信方式。发现 pipe 非常好用,所以记录了下。

Golang/gRPC 对网络的抽象

首先,我们先看一下 gRPC 一次调用的架构图。当然,这个架构图目前只关注了网络抽象分布。



我们重点关注网络部分。

操作系统系统抽象

首先,在网络包之上,系统抽象出来了socket,代表一条虚拟连接,对于 UDP,这个虚拟连接是不可靠的,对于 TCP,这个链接是尽力可靠的。


对于网络编程而言,仅仅有连接是不够的,还需要告诉开发者如何创建、关闭连接。对于服务端,系统提供了accept方法,用来接收连接。对于客户端,系统提供了connect方法,用于和服务端建立连接。

Golang 抽象

在 Golang 中,socket 对等的概念叫net.Conn,代表了一条虚拟连接。


接下来,对于服务端,accept 这个行为被包装成了net.Listener接口;对于客户端,Golang 则基于 connect 提供了net.Dial方法


type Listener interface {  // 接收来自客户端的网络连接  Accept() (Conn, error)  Close() error  Addr() Addr}
复制代码

gRPC 使用

那么 gRPC 是怎么使用 Listener 和 Dial 的呢?


对于 gRPC 服务端,Serve方法接收一个 Listener,表示在这个 Listener 上提供服务。


对于 gRPC 客户端,网络本质上就是一个能够连接到某个地方的东西就可以,所以只需要一个dialer func(context.Context, string) (net.Conn, error)函数就行了。

什么是 pipe

在操作系统层面,pipe表示一个数据管道,而这个管道两端都在本程序中,可以很好的满足我们的要求:基于内存的网络通信。


Golang 也基于 pipe 提供了net.Pipe()函数创建了一个双向的、基于内存通信的管道,在能力上,能够很好的满足 gRPC 对底层通信的要求。


但是net.Pipe仅仅产生了两个net.Conn,即只产生两个网络连接,没有之前提到的 Listner,也没有 Dial 方法。


于是结合 Golang 的 channel,把net.Pipe包装成了 Listner,也提供了 Dial 方法:


  1. Listener.Accept(),只需要监听一个 channel,客户端连接过来的时候,把连接通过 channel 传递过来即可

  2. Dial方法,调用 Pipe,将一端通过 channel 给服务端(作为服务端连接),另一端作为客户端连接


代码如下:


package main
import ( "context" "errors" "net" "sync" "sync/atomic")
var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)
type PipeListener struct { ch chan net.Conn close chan struct{} done uint32 m sync.Mutex}
func ListenPipe() *PipeListener { return &PipeListener{ ch: make(chan net.Conn), close: make(chan struct{}), }}
// Accept 等待客户端连接func (l *PipeListener) Accept() (c net.Conn, e error) { select { case c = <-l.ch: case <-l.close: e = ErrPipeListenerClosed } return}
// Close 关闭 listener.func (l *PipeListener) Close() (e error) { if atomic.LoadUint32(&l.done) == 0 { l.m.Lock() defer l.m.Unlock() if l.done == 0 { defer atomic.StoreUint32(&l.done, 1) close(l.close) return } } e = ErrPipeListenerClosed return}
// Addr 返回 listener 的地址func (l *PipeListener) Addr() net.Addr { return pipeAddr(0)}func (l *PipeListener) Dial(network, addr string) (net.Conn, error) { return l.DialContext(context.Background(), network, addr)}func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) { // PipeListener是否已经关闭 if atomic.LoadUint32(&l.done) != 0 { e = ErrPipeListenerClosed return }
// 创建pipe c0, c1 := net.Pipe() // 等待连接传递到服务端接收 select { case <-ctx.Done(): e = ctx.Err() case l.ch <- c0: conn = c1 case <-l.close: c0.Close() c1.Close() e = ErrPipeListenerClosed } return}
type pipeAddr int
func (pipeAddr) Network() string { return `pipe`}func (pipeAddr) String() string { return `pipe`}
复制代码

如何用 pipe 作为 gRPC 的 connection

有了上面的包装,我们就可以基于此创建一个 gRPC 的服务器端和客户端,来进行基于内存的 RPC 通信了。


首先,我们简单的创建一个服务,包含了四种调用方式:


syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";option java_multiple_files = true;option java_package = "io.grpc.examples.helloworld";option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.service Greeter { // unary调用 rpc SayHello(HelloRequest) returns (HelloReply) {}
// 服务端流式调用 rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);
// 客户端流式调用 rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);
// 双向流式调用 rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);}
// The request message containing the user's name.message HelloRequest { string name = 1;}
// The response message containing the greetingsmessage HelloReply { string message = 1;}
复制代码


然后生成相关的 stub 代码:


protoc --go_out=. --go_opt=paths=source_relative \  --go-grpc_out=. --go-grpc_opt=paths=source_relative \  helloworld/helloworld.proto
复制代码


然后开始写服务端代码,基本逻辑就是实现一个 demo 版本的服务端就好:


package main
import ( "context" "log"
"github.com/robberphex/grpc-in-memory/helloworld" pb "github.com/robberphex/grpc-in-memory/helloworld")
// helloworld.GreeterServer 的实现type server struct { // 为了后面代码兼容,必须聚合UnimplementedGreeterServer // 这样以后在proto文件中新增加一个方法的时候,这段代码至少不会报错 pb.UnimplementedGreeterServer}
// unary调用的服务端代码func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil}
// 客户端流式调用的服务端代码// 接收两个req,然后返回一个respfunc (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error { req, err := streamServer.Recv() if err != nil { log.Printf("error receiving: %v", err) return err } log.Printf("Received: %v", req.GetName()) req, err = streamServer.Recv() if err != nil { log.Printf("error receiving: %v", err) return err } log.Printf("Received: %v", req.GetName()) streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()}) return nil}
// 服务端流式调用的服务端代码// 接收一个req,然后发送两个respfunc (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error { log.Printf("Received: %v", req.GetName()) err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()}) if err != nil { log.Printf("error Send: %+v", err) return err } err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"}) if err != nil { log.Printf("error Send: %+v", err) return err } return nil}
// 双向流式调用的服务端代码func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error { req, err := streamServer.Recv() if err != nil { log.Printf("error receiving: %+v", err) // 及时将错误返回给客户端,下同 return err } log.Printf("Received: %v", req.GetName()) err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()}) if err != nil { log.Printf("error Send: %+v", err) return err } // 离开这个函数后,streamServer会关闭,所以不推荐在单独的goroute发送消息 return nil}
// 新建一个服务端实现func NewServerImpl() *server { return &server{}}
复制代码


然后我们创建一个基于 pipe 连接的客户端来调用服务端。


包含如下几个步骤:


  1. 创建服务端实现

  2. 基于 pipe 创建 listener,然后基于它创建 gRPC server

  3. 基于 pipe 创建客户端连接,然后创建 gRPC client,调用服务


代码如下:


package main
import ( "context" "fmt" "log" "net"
pb "github.com/robberphex/grpc-in-memory/helloworld" "google.golang.org/grpc")
// 将一个服务实现转化为一个客户端func serverToClient(svc *server) pb.GreeterClient { // 创建一个基于pipe的Listener pipe := ListenPipe()
s := grpc.NewServer() // 注册Greeter服务到gRPC pb.RegisterGreeterServer(s, svc) if err := s.Serve(pipe); err != nil { log.Fatalf("failed to serve: %v", err) } // 客户端指定使用pipe作为网络连接 clientConn, err := grpc.Dial(`pipe`, grpc.WithInsecure(), grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) { return pipe.DialContext(c, `pipe`, s) }), ) if err != nil { log.Fatalf("did not connect: %v", err) } // 基于pipe连接,创建gRPC客户端 c := pb.NewGreeterClient(clientConn) return c}
func main() { svc := NewServerImpl() c := serverToClient(svc)
ctx := context.Background()
// unary调用 for i := 0; i < 5; i++ { r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage()) }
// 客户端流式调用 for i := 0; i < 5; i++ { streamClient, err := c.SayHelloRequestStream(ctx) if err != nil { log.Fatalf("could not SayHelloRequestStream: %v", err) } err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)}) if err != nil { log.Fatalf("could not Send: %v", err) } err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)}) if err != nil { log.Fatalf("could not Send: %v", err) } reply, err := streamClient.CloseAndRecv() if err != nil { log.Fatalf("could not Recv: %v", err) } log.Println(reply.GetMessage()) }
// 服务端流式调用 for i := 0; i < 5; i++ { streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)}) if err != nil { log.Fatalf("could not SayHelloReplyStream: %v", err) } reply, err := streamClient.Recv() if err != nil { log.Fatalf("could not Recv: %v", err) } log.Println(reply.GetMessage()) reply, err = streamClient.Recv() if err != nil { log.Fatalf("could not Recv: %v", err) } log.Println(reply.GetMessage()) }
// 双向流式调用 for i := 0; i < 5; i++ { streamClient, err := c.SayHelloBiStream(ctx) if err != nil { log.Fatalf("could not SayHelloStream: %v", err) } err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)}) if err != nil { log.Fatalf("could not Send: %v", err) } reply, err := streamClient.Recv() if err != nil { log.Fatalf("could not Recv: %v", err) } log.Println(reply.GetMessage()) }}
复制代码

总结

当然,作为基于内存的 RPC 调用,还可以有更好的方式,比如直接将对象传递到服务端,直接通过本地调用方式来通信。但这种方式破坏了很多约定,比如对象地址、比如 gRPC 连接参数不生效等等。


本文介绍的,基于 Pipe 的通信方式,除了网络层走了内存传递之外,其他都和正常 RPC 通信行为一致,比如同样经历了序列化、经历了 HTTP/2 的流控制等。当然,性能上比原生调用也会差一点,但是好在对于测试、验证场景,行为上的一致比较重要些。


本文代码已经托管到了 GitHub https://github.com/robberphex/grpc-in-memory

发布于: 2 小时前阅读数: 6
用户头像

Robert Lu

关注

还未添加个人签名 2015.04.06 加入

阿里云高级开发工程师

评论

发布
暂无评论
基于内存通信的gRPC调用