java 版 gRPC 实战之四:客户端流
欢迎访问我的 GitHub
这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
本篇概览
本文是《java 版 gRPC 实战》系列的第四篇,前文掌握了服务端流,适合从服务端获取大量数据的场景,今天的目标是掌握客户端流类型的服务,包括服务提供方和使用方两侧的开发;
先来看看官方资料对客户端流式 RPC 的介绍:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;
本文由以下几部分组成:
提前小结几个重要的知识点,稍后开发过程中要重点关注这几个地方;
在 proto 文件中定义客户端流类型的 gRPC 接口,再通过 proto 生成 java 代码;
开发服务端应用;
开发客户端应用;
验证;
提前小结
为了突出重点,这里将几个关键的知识点提前给出:
客户端流的特点,是请求方以流的形式提交数据到响应方;
一次 RPC 请求中,请求方可以通过流的方式源源不断的提交数据,直到调用了 StreamObserver 的 onCompleted 方法,才算提交数据完成;
平时咱们调用方法时,方法内部用到的数据是通过入参传进来的,但这里不一样,客户端要传给服务端的数据和 gRPC 方法的入参没有关系,而是和方法的返回对象有关(执行返回对象的 onNext 方法可以将数据传给服务端);
客户端在 A 线程上传完数据后,服务端的响应是在另一个线程 B 执行的,因此,如果 A 线程拿到服务端响应,就要 B 线程的异步响应方法执行完毕,等待的方法有多种,我用的是 CountDownLatch;
在服务端,开发者要编写的代码和以往 web 开发不同,不是将数据处理好返回,而是返回一个 StreamObserver 实例给上层框架,由框架负责处理的逻辑,开发者专注开发 StreamObserver 的实现即可,例如重写 onNext 方法,客户端通过流每上传一笔数据,onNext 方法都会被外层框架执行一次;
如果您用的是 IDEA,记得勾选下图红框中的选框,否则运行应用的时候可能遇到 lombok 相关的问题:
上面提到的这些,会在接下来的开发过程中充分体现出来;
源码下载
本篇实战中的完整源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
这个 git 项目中有多个文件夹,《java 版 gRPC 实战》系列的源码在 grpc-tutorials 文件夹下,如下图红框所示:
grpc-tutorials 文件夹下有多个目录,本篇文章对应的服务端代码在 client-stream-server-side 目录下,客户端代码在 client-stream-client-side 目录下,如下图:
在 proto 文件中定义客户端流类型的 gRPC 接口
首先要做的就是定义 gRPC 接口,打开 mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是 AddToCart 方法的入参 ProductOrder 前面添加了 stream 修饰,代表该方法是客户端流类型:
双击下图红框中的 task 即可生成 java 代码:
生成下图红框中的文件:
接下来开发服务端;
开发服务端应用
在父工程 grpc-turtorials 下面新建名为 client-stream-server-side 的模块,其 build.gradle 内容如下:
配置文件 application.yml:
启动类 ClientStreamServerSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;
重点是提供 grpc 服务的 GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的 onNext、onCompleted 方法何时被调用是上层框架决定的,另外还准备了成员变量 totalCount,这样就可以记录总数了:
开发客户端应用
在父工程 grpc-turtorials 下面新建名为 client-stream-server-side 的模块,其 build.gradle 内容如下:
配置文件 application.yml,设置自己的 web 端口号和服务端地址:
启动类 ClientStreamClientSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;
正常情况下我们都是用 StreamObserver 处理服务端响应,这里由于是异步响应,需要额外的方法从 StreamObserver 中取出业务数据,于是定一个新接口,继承自 StreamObserver,新增 getExtra 方法可以返回 String 对象,详细的用法稍后会看到:
重头戏来了,看看如何远程调用客户端流类型的 gRPC 接口,前面小结提到的 2、3、4 点都会涉及到,代码中已经添加详细注释:
最后做个 web 接口,可以通过 web 请求验证远程调用:
编码完成,开始验证;
验证
启动服务端 ClientStreamServerSideApplication:
启动客户端 ClientStreamClientSideApplication:
浏览器输入 http://localhost:8082/?count=100,响应如下,可见远程调用 gRPC 服务成功:
下面是服务端日志,可见逐一处理了客户端的每一笔数据:
下面是客户端日志,可见由于 CountDownLatch 的作用,发起 gRPC 请求的线程一直等待 responseObserver.onCompleted 在另一个线程被执行完后,才会继续执行:
至此,客户端流类型的 gRPC 服务及其客户端开发就完成了,这种异步操作与咱们平时开发同步类型的 web 接口还是有差别的,希望本文能给您带来一些参考,下一篇咱们实战最后一种类型:双向流式;
欢迎关注 InfoQ:程序员欣宸
版权声明: 本文为 InfoQ 作者【程序员欣宸】的原创文章。
原文链接:【http://xie.infoq.cn/article/f3dcc9e67db089f7df807198c】。文章转载请联系作者。
评论