写点什么

java 版 gRPC 实战之四:客户端流

作者:程序员欣宸
  • 2022 年 3 月 30 日
  • 本文字数:5666 字

    阅读完需:约 19 分钟

java版gRPC实战之四:客户端流

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


本篇概览

  • 本文是《java 版 gRPC 实战》系列的第四篇,前文掌握了服务端流,适合从服务端获取大量数据的场景,今天的目标是掌握客户端流类型的服务,包括服务提供方和使用方两侧的开发;

  • 先来看看官方资料对客户端流式 RPC 的介绍:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;

  • 本文由以下几部分组成:


  1. 提前小结几个重要的知识点,稍后开发过程中要重点关注这几个地方;

  2. 在 proto 文件中定义客户端流类型的 gRPC 接口,再通过 proto 生成 java 代码;

  3. 开发服务端应用;

  4. 开发客户端应用;

  5. 验证;

提前小结

为了突出重点,这里将几个关键的知识点提前给出:


  1. 客户端流的特点,是请求方以流的形式提交数据到响应方;

  2. 一次 RPC 请求中,请求方可以通过流的方式源源不断的提交数据,直到调用了 StreamObserver 的 onCompleted 方法,才算提交数据完成;

  3. 平时咱们调用方法时,方法内部用到的数据是通过入参传进来的,但这里不一样,客户端要传给服务端的数据和 gRPC 方法的入参没有关系,而是和方法的返回对象有关(执行返回对象的 onNext 方法可以将数据传给服务端);

  4. 客户端在 A 线程上传完数据后,服务端的响应是在另一个线程 B 执行的,因此,如果 A 线程拿到服务端响应,就要 B 线程的异步响应方法执行完毕,等待的方法有多种,我用的是 CountDownLatch;

  5. 在服务端,开发者要编写的代码和以往 web 开发不同,不是将数据处理好返回,而是返回一个 StreamObserver 实例给上层框架,由框架负责处理的逻辑,开发者专注开发 StreamObserver 的实现即可,例如重写 onNext 方法,客户端通过流每上传一笔数据,onNext 方法都会被外层框架执行一次;

  6. 如果您用的是 IDEA,记得勾选下图红框中的选框,否则运行应用的时候可能遇到 lombok 相关的问题:


  • 上面提到的这些,会在接下来的开发过程中充分体现出来;

源码下载

  • 本篇实战中的完整源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):



  • 这个 git 项目中有多个文件夹,《java 版 gRPC 实战》系列的源码在 grpc-tutorials 文件夹下,如下图红框所示:

  • grpc-tutorials 文件夹下有多个目录,本篇文章对应的服务端代码在 client-stream-server-side 目录下,客户端代码在 client-stream-client-side 目录下,如下图:

在 proto 文件中定义客户端流类型的 gRPC 接口

  • 首先要做的就是定义 gRPC 接口,打开 mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是 AddToCart 方法的入参 ProductOrder 前面添加了 stream 修饰,代表该方法是客户端流类型:


// gRPC服务,这是个在线商城的购物车服务service CartService {    // 客户端流式:添加多个商品到购物车    rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}}
// 提交购物车时的产品信息message ProductOrder { // 商品ID int32 productId = 1; // 商品数量 int32 number = 2;}
// 提交购物车返回结果的数据结构message AddCartReply { // 返回码 int32 code = 1; // 描述信息 string message = 2;}
复制代码


  • 双击下图红框中的 task 即可生成 java 代码:

  • 生成下图红框中的文件:

  • 接下来开发服务端;

开发服务端应用

  • 在父工程 grpc-turtorials 下面新建名为 client-stream-server-side 的模块,其 build.gradle 内容如下:


// 使用springboot插件plugins {    id 'org.springframework.boot'}
dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' // 作为gRPC服务提供方,需要用到此库 implementation 'net.devh:grpc-server-spring-boot-starter' // 依赖自动生成源码的工程 implementation project(':grpc-lib') // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor annotationProcessor 'org.projectlombok:lombok'}
复制代码


  • 配置文件 application.yml:


spring:  application:    name: client-stream-server-side# gRPC有关的配置,这里只需要配置服务端口号grpc:  server:    port: 9900
复制代码


  • 启动类 ClientStreamServerSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;

  • 重点是提供 grpc 服务的 GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的 onNext、onCompleted 方法何时被调用是上层框架决定的,另外还准备了成员变量 totalCount,这样就可以记录总数了:


package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService@Slf4jpublic class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {
@Override public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) { // 返回匿名类,给上层框架使用 return new StreamObserver<ProductOrder>() {
// 记录处理产品的总量 private int totalCount = 0;
@Override public void onNext(ProductOrder value) { log.info("正在处理商品[{}],数量为[{}]", value.getProductId(), value.getNumber());
// 增加总量 totalCount += value.getNumber(); }
@Override public void onError(Throwable t) { log.error("添加购物车异常", t); }
@Override public void onCompleted() { log.info("添加购物车完成,共计[{}]件商品", totalCount); responseObserver.onNext(AddCartReply.newBuilder() .setCode(10000) .setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount)) .build()); responseObserver.onCompleted(); } }; }}
复制代码

开发客户端应用

  • 在父工程 grpc-turtorials 下面新建名为 client-stream-server-side 的模块,其 build.gradle 内容如下:


plugins {    id 'org.springframework.boot'}
dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation project(':grpc-lib')}
复制代码


  • 配置文件 application.yml,设置自己的 web 端口号和服务端地址:


server:  port: 8082spring:  application:    name: client-stream-client-side
grpc: client: # gRPC配置的名字,GrpcClient注解会用到 client-stream-server-side: # gRPC服务端地址 address: 'static://127.0.0.1:9900' enableKeepAlive: true keepAliveWithoutCalls: true negotiationType: plaintext
复制代码


  • 启动类 ClientStreamClientSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;

  • 正常情况下我们都是用 StreamObserver 处理服务端响应,这里由于是异步响应,需要额外的方法从 StreamObserver 中取出业务数据,于是定一个新接口,继承自 StreamObserver,新增 getExtra 方法可以返回 String 对象,详细的用法稍后会看到:


package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> { String getExtra();}
复制代码


  • 重头戏来了,看看如何远程调用客户端流类型的 gRPC 接口,前面小结提到的 2、3、4 点都会涉及到,代码中已经添加详细注释:


package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import io.grpc.stub.StreamObserver;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.client.inject.GrpcClient;import org.springframework.stereotype.Service;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;
@Service@Slf4jpublic class GrpcClientService {
@GrpcClient("client-stream-server-side") private CartServiceGrpc.CartServiceStub cartServiceStub;
public String addToCart(int count) { CountDownLatch countDownLatch = new CountDownLatch(1); // responseObserver的onNext和onCompleted会在另一个线程中被执行, // ExtendResponseObserver继承自StreamObserver ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {
String extraStr;
@Override public String getExtra() { return extraStr; }
private int code;
private String message;
@Override public void onNext(AddCartReply value) { log.info("on next"); code = value.getCode(); message = value.getMessage(); }
@Override public void onError(Throwable t) { log.error("gRPC request error", t); extraStr = "gRPC error, " + t.getMessage(); countDownLatch.countDown(); }
@Override public void onCompleted() { log.info("on complete"); extraStr = String.format("返回码[%d],返回信息:%s" , code, message); countDownLatch.countDown(); } }; // 远程调用,此时数据还没有给到服务端 StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver); for(int i=0; i<count; i++) { // 发送一笔数据到服务端 requestObserver.onNext(build(101 + i, 1 + i)); }
// 客户端告诉服务端:数据已经发完了 requestObserver.onCompleted();
try { // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行, // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了, // await的超时时间设置为2秒 countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("countDownLatch await error", e); }
log.info("service finish"); // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得 return responseObserver.getExtra(); }
/** * 创建ProductOrder对象 * @param productId * @param num * @return */ private static ProductOrder build(int productId, int num) { return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); }}
复制代码


  • 最后做个 web 接口,可以通过 web 请求验证远程调用:


package com.bolingcavalry.grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import java.util.List;
@RestControllerpublic class GrpcClientController {
@Autowired private GrpcClientService grpcClientService;
@RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "1") int count) { return grpcClientService.addToCart(count); }}
复制代码


  • 编码完成,开始验证;

验证

  • 启动服务端 ClientStreamServerSideApplication:

  • 启动客户端 ClientStreamClientSideApplication:

  • 浏览器输入 http://localhost:8082/?count=100,响应如下,可见远程调用 gRPC 服务成功:

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

  • 下面是客户端日志,可见由于 CountDownLatch 的作用,发起 gRPC 请求的线程一直等待 responseObserver.onCompleted 在另一个线程被执行完后,才会继续执行:

  • 至此,客户端流类型的 gRPC 服务及其客户端开发就完成了,这种异步操作与咱们平时开发同步类型的 web 接口还是有差别的,希望本文能给您带来一些参考,下一篇咱们实战最后一种类型:双向流式;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 2
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
java版gRPC实战之四:客户端流_gRPC_程序员欣宸_InfoQ写作平台