写点什么

grpc 双向流究竟是什么情况?2 段代码告诉你

  • 2022 年 3 月 23 日
  • 本文字数:1959 字

    阅读完需:约 6 分钟

本文分享自华为云社区《grpc双向流究竟是什么情况?2段代码告诉你》,作者:breakDawn。


为什么需要 grpc 双向流?

有时候请求调用和返回过程,并不是简单的一问一答形式,可能会涉及一次发送,多次分批返回,或者两边随意互相发送。

因此简单的 restful 模型无法满足上述常见,grpc 双向流应运而生,通过一个 tpc 链接实现了双向的异步 IO 通信。

grpc 双向流

一个双向流式 RPC 是双方使用读写流去发送一个消息序列。

两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如,服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。

  • 可以理解为常见 IO 模型里的异步 IO 的使用

每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。

//AcceptsastreamofRouteNotessentwhilearouteisbeingtraversed,//whilereceivingotherRouteNotes(e.g.fromotherusers).rpcRouteChat(streamRouteNote)returns(streamRouteNote){}
复制代码

客户端的双向流调用

  1. 定义一个 reponseOberserver,即响应观察者,用于定义如何处理服务端返回的消息。一般都是把消息放到一个某个阻塞队列或者单容量队列 SettableFuture 中。

  2. 调用 stub.sendMessage(reponseOberserver),即告诉 grpc 框架,我要用这个 reponseOberserver 去处理 sendMessage 消息的响应。

注意,这个 sendMesage 方法名,取决于我们的 proto 中怎么定义的。

  1. 然后 stub.sendMessage()方法回返回给我们一个 requestObserver,让我们用这个观察者.onNext()去发送请求,可以任意发多次,都是立刻返回的。

  2. 当不需要再发送时,可以调用 onCompleted 告知对方可以结束了


下面是官网摘抄的代码示例:

publicvoidrouteChat()throwsException{info("***RoutChat");finalSettableFuture<Void>finishFuture=SettableFuture.create();//定义了如何处理收到的返回消息观察者	StreamObserverreponseObserver=newStreamObserver<RouteNote>(){@OverridepublicvoidonNext(RouteNotenote){info("Gotmessage\"{0}\"at{1},{2}",note.getMessage(),note.getLocation().getLatitude(),note.getLocation().getLongitude());}
@OverridepublicvoidonError(Throwablet){finishFuture.setException(t);}
@OverridepublicvoidonCompleted(){ //往finishFuture设置空时,说明完成了消息流关闭了finishFuture.set(null);}};
//框架返回给我一个请求流观察者,让我用这个观察者.onNext(message)去发请求,返回结果和我传给他的responseServer绑定了。StreamObserver<RouteNote>requestObserver=asyncStub.routeChat();
try{RouteNote[]requests={newNote("Firstmessage",0,0),newNote("Secondmessage",0,1),newNote("Thirdmessage",1,0),newNote("Fourthmessage",1,1)};
for(RouteNoterequest:requests){info("Sendingmessage\"{0}\"at{1},{2}",request.getMessage(),request.getLocation().getLatitude(),request.getLocation().getLongitude());requestObserver.onNext(request);}requestObserver.onCompleted();
finishFuture.get();info("FinishedRouteChat");}catch(Exceptiont){requestObserver.onError(t);logger.log(Level.WARNING,"RouteChatFailed",t);throwt;}}
复制代码

服务端的处理方式:

  1. 我们建立服务端的时候,需要调用 nettyServer,建立 netty 服务,并绑定一个 xxxServiceImpl 抽象类。这个 xxxServiceImpl 就是我们在 proto 中定义的 server 结构,支持处理我们定义的消息。

  2. xxxServiceImpl 中,有很多需要覆写的方法,需要你定义如何处理收到的请求,以及如何给客户端发送响应。发送响应的动作就是参数里的 requestObserver.onNext(响应消息)

  3. 返回的 xxxService 类,会在第一步提供给 netty 以及 grpc 框架,收到消息时,会通过他的异步机制,分隔网络线程和业务线程,走到这边执行的地方。


下面是官网摘抄的代码示例:

class	xxxServiceextendxxxServiceImpl{@OverridepublicvoidlistFeatures(Rectanglerequest,StreamObserver<Feature>responseObserver){intleft=min(request.getLo().getLongitude(),request.getHi().getLongitude());intright=max(request.getLo().getLongitude(),request.getHi().getLongitude());inttop=max(request.getLo().getLatitude(),request.getHi().getLatitude());intbottom=min(request.getLo().getLatitude(),request.getHi().getLatitude());
for(Featurefeature:features){if(!RouteGuideUtil.exists(feature)){continue;}
intlat=feature.getLocation().getLatitude();intlon=feature.getLocation().getLongitude();if(lon>=left&&lon<=right&&lat>=bottom&&lat<=top){responseObserver.onNext(feature);}}responseObserver.onCompleted();}}
复制代码


点击关注,第一时间了解华为云新鲜技术~


发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
grpc双向流究竟是什么情况?2段代码告诉你_gRPC_华为云开发者社区_InfoQ写作平台