写点什么

写给 go 开发者的 gRPC 教程 - 通信模式

  • 2023-02-06
    北京
  • 本文字数:6143 字

    阅读完需:约 20 分钟

写给go开发者的gRPC教程-通信模式

本篇为【写给 go 开发者的 gRPC 教程系列】第二篇




上一篇介绍了如何编写 protobuf 的 idl,并使用 idl 生成了 gRPC 的代码,现在来看看如何编写客户端和服务端的代码

Simple RPC (Unary RPC)

syntax = "proto3";
package ecommerce;
import "google/protobuf/wrappers.proto";
option go_package = "ecommerce/";
message Order { string id = 1; repeated string items = 2; string description = 3; float price = 4; string destination = 5;}
service OrderManagement { rpc getOrder(google.protobuf.StringValue) returns (Order);}
复制代码


定义如上的 idl,需要关注几个事项


  • 使用protobuf最新版本syntax = "proto3";

  • protoc-gen-go要求 pb 文件必须指定 go 包的路径。即option go_package = "ecommerce/";

  • 定义的method仅能有一个入参和出参数。如果需要传递多个参数需要定义成message

  • 使用import引用另外一个文件的 pb。google/protobuf/wrappers.proto是 google 内置的类型


生成 go 和 grpc 的代码


$ protoc -I ./pb \  --go_out ./ecommerce --go_opt paths=source_relative \  --go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \  ./pb/product.proto
复制代码


ecommerce├── product.pb.go└── product_grpc.pb.gopb└── product.proto
复制代码

server 实现

1、由 pb 文件生成的 gRPC 代码中包含了 service 的接口定义,它和我们定义的 idl 是吻合的


service OrderManagement {  rpc getOrder(google.protobuf.StringValue) returns (Order);}
复制代码


type OrderManagementServer interface {  GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)  mustEmbedUnimplementedOrderManagementServer()}
复制代码


2、我们的业务逻辑就是实现这个接口


package main
import ( "context" "log"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/wrapperspb")
var _ pb.OrderManagementServer = &OrderManagementImpl{}
var orders = make(map[string]pb.Order)
type OrderManagementImpl struct { pb.UnimplementedOrderManagementServer}
// Simple RPCfunc (s *OrderManagementImpl) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) { ord, exists := orders[orderId.Value] if exists { return &ord, status.New(codes.OK, "").Err() }
return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)}
复制代码


3、在实现完业务逻辑之后,我们可以创建并启动服务


package main
import ( "net"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" "google.golang.org/grpc")
func main() { s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
lis, err := net.Listen("tcp", ":8009") if err != nil { panic(err) }
if err := s.Serve(lis); err != nil { panic(err) }}
复制代码


服务端代码实现的流程如下


client 实现

1、由 pb 文件生成的 gRPC 代码中包含了 client 的实现,它和我们定义的 idl 也是吻合的


service OrderManagement {  rpc getOrder(google.protobuf.StringValue) returns (Order);}
复制代码


type orderManagementClient struct {  cc grpc.ClientConnInterface}
func NewOrderManagementClient(cc grpc.ClientConnInterface) OrderManagementClient { return &orderManagementClient{cc}}
func (c *orderManagementClient) GetOrder(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error) { out := new(Order) err := c.cc.Invoke(ctx, "/ecommerce.OrderManagement/getOrder", in, out, opts...) if err != nil { return nil, err } return out, nil}
复制代码


2、直接使用 client 来进行 rpc 调用


package main
import ( "context" "log" "time"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/wrapperspb")
func main() { conn, err := grpc.Dial("127.0.0.1:8009", grpc.WithInsecure()) if err != nil { panic(err) } defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()
// Get Order retrievedOrder, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "101"}) if err != nil { panic(err) }
log.Print("GetOrder Response -> : ", retrievedOrder)}
复制代码


客户端代码实现的流程如下


小总结

✨ 前文提到过protobuf协议是平台无关的。演示的客户端和服务端都是 golang 的,即使客户端和服务端不同语言也是类似的可以通信的


✨ 对于上面介绍的的这种类似于http1.x的模式:客户端发送请求,服务端响应请求,一问一答的模式在 gRPC 里叫做Simple RPC (也称Unary RPC)。gRPC 同时也支持其他类型的交互方式。


Server-Streaming RPC 服务器端流式 RPC

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求


简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:


pb 定义

syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order { string id = 1; repeated string items = 2; string description = 3; float price = 4; string destination = 5;}
service OrderManagement { rpc searchOrders(google.protobuf.StringValue) returns (stream Order);}
复制代码

server 实现

✨ 注意与Simple RPC的区别:因为我们的服务端是流式响应的,因此对于服务端来说函数入参了一个stream OrderManagement_SearchOrdersServer参数用来写入多个响应,可以把它看作是客户端的对象


✨ 可以通过调用这个流对象的Send(...),来往客户端写入数据


✨ 通过返回nil或者error来表示全部数据写完了


func (s *server) SearchOrders(query *wrapperspb.StringValue,                              stream pb.OrderManagement_SearchOrdersServer) error {  for _, order := range orders {    for _, str := range order.Items {      if strings.Contains(str, query.Value) {        err := stream.Send(&order)        if err != nil {          return fmt.Errorf("error send: %v", err)        }      }    }  }
return nil}
复制代码

client 实现

✨ 注意与Simple RPC的区别:因为我们的服务端是流式响应的,因此 RPC 函数返回值stream是一个流,可以把它看作是服务端的对象


✨ 使用streamRecv函数来不断从服务端接收数据


✨ 当Recv返回io.EOF代表流已经结束


c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()
stream, err := c.SearchOrders(ctx, &wrapperspb.StringValue{Value: "Google"})if err != nil{ panic(err)}
for{ order, err := stream.Recv() if err == io.EOF{ break }
log.Println("Search Result: ", order)}
复制代码

小总结

Client-Streaming RPC 客户端流式 RPC

客户端流式 RPC,显然也是单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,大致如图:


服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应


pb 定义

syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order { string id = 1; repeated string items = 2; string description = 3; float price = 4; string destination = 5;}
service OrderManagement { rpc updateOrders(stream Order) returns (google.protobuf.StringValue);}
复制代码

server 实现

✨ 注意与Simple RPC的区别:因为我们的客户端是流式请求的,因此请求参数stream OrderManagement_UpdateOrdersServer就是流对象


✨ 可以从stream OrderManagement_UpdateOrdersServerRecv函数读取消息


✨ 当Recv返回io.EOF代表流已经结束


✨ 使用stream OrderManagement_UpdateOrdersServerSendAndClose函数关闭并发送响应


// 在这段程序中,我们对每一个 Recv 都进行了处理// 当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recvfunc (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {  ordersStr := "Updated Order IDs : "  for {    order, err := stream.Recv()    if err == io.EOF {      // Finished reading the order stream.      return stream.SendAndClose(        &wrapperspb.StringValue{Value: "Orders processed " + ordersStr})    }    // Update order    orders[order.Id] = *order
log.Println("Order ID ", order.Id, ": Updated") ordersStr += order.Id + ", " }}
复制代码

Client 实现

✨ 注意与Simple RPC的区别:因为我们的客户端是流式响应的,因此 RPC 函数返回值stream是一个流


✨ 可以通过调用这个流对象的Send(...),来往这个对象写入数据


✨ 使用streamCloseAndRecv函数关闭并发送响应


c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()
stream, err := c.UpdateOrders(ctx)if err != nil { panic(err)}
if err := stream.Send(&pb.Order{ Id: "00", Items: []string{"A", "B"}, Description: "A with B", Price: 0.11, Destination: "ABC",}); err != nil { panic(err)}
if err := stream.Send(&pb.Order{ Id: "01", Items: []string{"C", "D"}, Description: "C with D", Price: 1.11, Destination: "ABCDEFG",}); err != nil { panic(err)}
res, err := stream.CloseAndRecv()if err != nil { panic(err)}
log.Printf("Update Orders Res : %s", res)
复制代码

小总结

Bidirectional-Streaming RPC 双向流式 RPC

双向流式 RPC,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求


首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)


假设该双向流是按顺序发送的话,大致如图:


pb 定义

syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order { string id = 1; repeated string items = 2; string description = 3; float price = 4; string destination = 5;}
message CombinedShipment { string id = 1; string status = 2; repeated Order orderList = 3;}
service OrderManagement { rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);}
复制代码

server 实现

✨ 函数入参OrderManagement_ProcessOrdersServer是用来写入多个响应和读取多个消息的对象引用


✨ 可以通过调用这个流对象的Send(...),来往这个对象写入响应


✨ 可以通过调用这个流对象的Recv(...)函数读取消息,当Recv返回io.EOF代表流已经结束


✨ 通过返回nil或者error表示全部数据写完了


func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMarker := 1 var combinedShipmentMap = make(map[string]pb.CombinedShipment) for { orderId, err := stream.Recv() log.Printf("Reading Proc order : %s", orderId) if err == io.EOF { log.Printf("EOF : %s", orderId) for _, shipment := range combinedShipmentMap { if err := stream.Send(&shipment); err != nil { return err } } return nil } if err != nil { log.Println(err) return err }
destination := orders[orderId.GetValue()].Destination shipment, found := combinedShipmentMap[destination]
if found { ord := orders[orderId.GetValue()] shipment.OrderList = append(shipment.OrderList, &ord) combinedShipmentMap[destination] = shipment } else { comShip := pb.CombinedShipment{Id: "cmb - " + (orders[orderId.GetValue()].Destination), Status: "Processed!"} ord := orders[orderId.GetValue()] comShip.OrderList = append(shipment.OrderList, &ord) combinedShipmentMap[destination] = comShip log.Print(len(comShip.OrderList), comShip.GetId()) }
if batchMarker == orderBatchSize { for _, comb := range combinedShipmentMap { log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList)) if err := stream.Send(&comb); err != nil { return err } } batchMarker = 0 combinedShipmentMap = make(map[string]pb.CombinedShipment) } else { batchMarker++ } }}
复制代码

Client 实现

✨ 函数返回值OrderManagement_ProcessOrdersClient是用来获取多个响应和写入多个消息的对象引用


✨ 可以通过调用这个流对象的Send(...),来往这个对象写入响应


✨ 可以通过调用这个流对象的Recv(...)函数读取消息,当Recv返回io.EOF代表流已经结束


c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()
stream, err := c.ProcessOrders(ctx)if err != nil { panic(err)}
go func() { if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil { panic(err) }
if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil { panic(err) }
if err := stream.CloseSend(); err != nil { panic(err) }}()
for { combinedShipment, err := stream.Recv() if err == io.EOF { break } log.Println("Combined shipment : ", combinedShipment.OrderList)}
复制代码

小总结

双向流相对还是比较复杂的,大部分场景都是使用事件机制进行异步交互,需要精心的设计


示例代码

https://github.com/liangwt/grpc-example




✨ 微信公众号【凉凉的知识库】同步更新,欢迎关注获取最新最有用的后端知识 ✨



发布于: 刚刚阅读数: 6
用户头像

微信搜:凉凉的知识库 2018-08-20 加入

大厂资深研发,专注golang、微服务、服务治理领域

评论

发布
暂无评论
写给go开发者的gRPC教程-通信模式_golang_凉凉的知识库_InfoQ写作社区