写点什么

java 版 gRPC 实战之五:双向流

作者:程序员欣宸
  • 2022 年 3 月 30 日
  • 本文字数:5793 字

    阅读完需:约 19 分钟

java版gRPC实战之五:双向流

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


本篇概览

  • 本文是《java 版 gRPC 实战》系列的第五篇,目标是掌握双向流类型的服务,即请求参数是流的形式,响应的内容也是流的形式;

  • 先来看看官方资料对双向流式 RPC 的介绍:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

  • 掌握了客户端流和服务端流两种类型的开发后,双向流类型就很好理解了,就是之前两种类型的结合体,请求和响应都按照流的方式处理即可;

  • 今天的实战,咱们来设计一个在线商城的功能:批量减扣库存,即客户端提交多个商品和数量,服务端返回每个商品减扣库存成功和失败的情况;

  • 咱们尽快进入编码环节吧,具体内容如下:


  1. 在 proto 文件中定义双向流类型的 gRPC 接口,再通过 proto 生成 java 代码

  2. 开发服务端应用

  3. 开发客户端应用

  4. 验证

源码下载

  • 本篇实战中的完整源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):


  • 这个 git 项目中有多个文件夹,《java 版 gRPC 实战》系列的源码在 grpc-tutorials 文件夹下,如下图红框所示:

  • grpc-tutorials 文件夹下有多个目录,本篇文章对应的服务端代码在 double-stream-server-side 目录下,客户端代码在 double-stream-client-side 目录下,如下图:

在 proto 文件中定义双向流类型的 gRPC 接口

  • 首先要做的就是定义 gRPC 接口,打开 mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是 BatchDeduct 方法的入参 ProductOrder 和返回值 DeductReply 都添加了 stream 修饰(ProductOrder 是上一章定义的),代表该方法是双向流类型:


// gRPC服务,这是个在线商城的库存服务service StockService {    // 双向流式:批量扣减库存    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}}
// 扣减库存返回结果的数据结构message DeductReply { // 返回码 int32 code = 1; // 描述信息 string message = 2;}
复制代码


  • 双击下图红框中的 task 即可生成 java 代码:

  • 生成下图红框中的文件,即服务端定义和返回值数据结构:

  • 接下来开发服务端;

开发服务端应用

  • 在父工程 grpc-turtorials 下面新建名为 double-stream-server-side 的模块,其 build.gradle 内容如下:


// 使用springboot插件plugins {    id 'org.springframework.boot'}
dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' // 作为gRPC服务提供方,需要用到此库 implementation 'net.devh:grpc-server-spring-boot-starter' // 依赖自动生成源码的工程 implementation project(':grpc-lib') // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor annotationProcessor 'org.projectlombok:lombok'}
复制代码


  • 配置文件 application.yml:


spring:  application:    name: double-stream-server-side# gRPC有关的配置,这里只需要配置服务端口号grpc:  server:    port: 9901
复制代码


  • 启动类 DoubleStreamServerSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;

  • 重点是提供 grpc 服务的 GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的 onNext、onCompleted 方法何时被调用是上层框架决定的,另外还准备了成员变量 totalCount,这样就可以记录总数了,由于请求参数是流,因此匿名类的 onNext 会被多次调用,并且由于返回值是流,因此 onNext 中调用了 responseObserver.onNext 方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的 onNext 方法会被多次调用):


package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService@Slf4jpublic class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {
@Override public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) { // 返回匿名类,给上层框架使用 return new StreamObserver<ProductOrder>() {
private int totalCount = 0;
@Override public void onNext(ProductOrder value) { log.info("正在处理商品[{}],数量为[{}]", value.getProductId(), value.getNumber());
// 增加总量 totalCount += value.getNumber();
int code; String message;
// 假设单数的都有库存不足的问题 if (0 == value.getNumber() % 2) { code = 10000; message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber()); } else { code = 10001; message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber()); }
responseObserver.onNext(DeductReply.newBuilder() .setCode(code) .setMessage(message) .build()); }
@Override public void onError(Throwable t) { log.error("批量减扣库存异常", t); }
@Override public void onCompleted() { log.info("批量减扣库存完成,共计[{}]件商品", totalCount); responseObserver.onCompleted(); } }; }}
复制代码

开发客户端应用

  • 在父工程 grpc-turtorials 下面新建名为 double-stream-server-side 的模块,其 build.gradle 内容如下:


plugins {    id 'org.springframework.boot'}
dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation project(':grpc-lib')}
复制代码


  • 配置文件 application.yml,设置自己的 web 端口号和服务端地址:


server:  port: 8082spring:  application:    name: double-stream-client-side
grpc: client: # gRPC配置的名字,GrpcClient注解会用到 double-stream-server-side: # gRPC服务端地址 address: 'static://127.0.0.1:9901' enableKeepAlive: true keepAliveWithoutCalls: true negotiationType: plaintext
复制代码


  • 启动类 DoubleStreamClientSideApplication.java 的代码就不贴了,普通的 springboot 启动类而已;

  • 正常情况下我们都是用 StreamObserver 处理服务端响应,这里由于是异步响应,需要额外的方法从 StreamObserver 中取出业务数据,于是定一个新接口,继承自 StreamObserver,新增 getExtra 方法可以返回 String 对象,详细的用法稍后会看到:


package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> { String getExtra();}
复制代码


  • 重头戏来了,看看如何远程调用双向流类型的 gRPC 接口,代码中已经添加详细注释:


package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.client.inject.GrpcClient;import org.springframework.stereotype.Service;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;
@Service@Slf4jpublic class GrpcClientService {
@GrpcClient("double-stream-server-side") private StockServiceGrpc.StockServiceStub stockServiceStub;
/** * 批量减库存 * @param count * @return */ public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行, // ExtendResponseObserver继承自StreamObserver ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {
// 用stringBuilder保存所有来自服务端的响应 private StringBuilder stringBuilder = new StringBuilder();
@Override public String getExtra() { return stringBuilder.toString(); }
/** * 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应, * 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容 * @param value */ @Override public void onNext(DeductReply value) { log.info("batch deduct on next"); // 放入匿名类的成员变量中 stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage())); }
@Override public void onError(Throwable t) { log.error("batch deduct gRPC request error", t); stringBuilder.append("batch deduct gRPC error, " + t.getMessage()); countDownLatch.countDown(); }
/** * 服务端确认响应完成后,这里的onCompleted方法会被调用 */ @Override public void onCompleted() { log.info("batch deduct on complete"); // 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了, // 会继续往下执行 countDownLatch.countDown(); } };
// 远程调用,此时数据还没有给到服务端 StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i<count; i++) { // 每次执行onNext都会发送一笔数据到服务端, // 服务端的onNext方法都会被执行一次 requestObserver.onNext(build(101 + i, 1 + i)); }
// 客户端告诉服务端:数据已经发完了 requestObserver.onCompleted();
try { // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行, // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了, // await的超时时间设置为2秒 countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("countDownLatch await error", e); }
log.info("service finish"); // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得 return responseObserver.getExtra(); }
/** * 创建ProductOrder对象 * @param productId * @param num * @return */ private static ProductOrder build(int productId, int num) { return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); }}
复制代码


  • 最后做个 web 接口,可以通过 web 请求验证远程调用:


package grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
@RestControllerpublic class GrpcClientController {
@Autowired private GrpcClientService grpcClientService;
@RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "1") int count) { return grpcClientService.batchDeduct(count); }}
复制代码


  • 编码完成,开始验证;

验证

  • 启动服务端 DoubleStreamServerSideApplication:

  • 启动客户端 DoubleStreamClientSideApplication:

  • 这里要改:浏览器输入 http://localhost:8083/?count=10,响应如下,可见远程调用 gRPC 服务成功,流式响应的每一笔返回都被客户端收到:

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

  • 下面是客户端日志,可见由于 CountDownLatch 的作用,发起 gRPC 请求的线程一直等待 responseObserver.onCompleted 在另一个线程被执行完后,才会继续执行:

  • 至此,四种类型的 gRPC 服务及其客户端开发就完成了,一般的业务场景咱们都能应付自如,接下来的文章咱们会继续深入学习,了解复杂场景下的 gRPC 操作;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 2
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
java版gRPC实战之五:双向流_gRPC_程序员欣宸_InfoQ写作平台