写点什么

gRPC 三种客户端类型实践【Java 版】

作者:FunTester
  • 2022 年 5 月 11 日
  • 本文字数:4635 字

    阅读完需:约 15 分钟

本文承袭Grpc服务开发和接口测试初探【Java】内容,学会了基本的 gRPC 的基本 Demo 之后,自然要开始了各类客户端的学习。由于服务端的代码都是由开发写好的,所以作为新手测试来说,我觉得学好客户端的代码优先级更高一些。


书接上文,gRPC 客户端有三种实现方式,其实就是从io.grpc.ManagedChannel创建客户端Stub的过程。三种方式分别为:newBlockingStubnewStubnewFutureStub。下面通过代码演示来分享三种的区别和优劣。


gRPC 客户端目前用起来跟 HTTP 协议一样,调用方式跟 HttpClient 调用一样。分成了阻塞、异步和future,有兴趣可以移步HTTP异步连接池和多线程实践

服务端

服务端是上期进行改造,主要是增加了响应等待时间和时间信息,方便后面验证不同客户端功能。代码如下:


package com.funtester.grpc;
import com.funtester.frame.SourceCode;import com.funtester.fungrpc.HelloRequest;import com.funtester.fungrpc.HelloResponse;import com.funtester.fungrpc.HelloServiceGrpc;import com.funtester.utils.Time;import io.grpc.stub.StreamObserver;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
private static final Logger logger = LogManager.getLogger(HelloServiceImpl.class);
@Override public void executeHi(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { HelloResponse response = HelloResponse.newBuilder() .setMsg("你好 " + request.getName()+ Time.getDate()) .build(); SourceCode.sleep(1.0); logger.info("用户{}来了",request.getName()); responseObserver.onNext(response); responseObserver.onCompleted(); }
}
复制代码

newBlockingStub

顾名思义,这个是阻塞调用的 gRPC 客户端类型,实际使用中跟 HTTP 接口请求->响应一样,代码如下:


package com.funtest.grpc
import com.funtester.frame.SourceCodeimport com.funtester.fungrpc.HelloRequestimport com.funtester.fungrpc.HelloResponseimport com.funtester.fungrpc.HelloServiceGrpcimport io.grpc.ManagedChannelimport io.grpc.ManagedChannelBuilder
import java.util.concurrent.ExecutionException
class BlockClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080) .usePlaintext() .build()
HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel).withCompression("gzip")
HelloRequest helloRequest = HelloRequest.newBuilder() .setName("FunTester") .build() 5.times { HelloResponse orderResponse = helloServiceBlockingStub.executeHi(helloRequest) output("收到响应: " + orderResponse.getMsg()) } managedChannel.shutdown() }
}
复制代码


控制台输出:


20:46:04.664 main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.1620:46:04.675 main   ###### #     #  #    # ####### ######  #####  ####### ######  #####  #      #     #  ##   #    #    #       #         #    #       #    #  ####   #     #  # #  #    #    ####    #####     #    ####    #####  #      #     #  #  # #    #    #            #    #    #       #   #  #       #####   #    #    #    ######  #####     #    ######  #    #
20:46:06.517 main 收到响应: 你好 FunTester2022-05-09 20:46:0520:46:07.521 main 收到响应: 你好 FunTester2022-05-09 20:46:0620:46:08.531 main 收到响应: 你好 FunTester2022-05-09 20:46:0720:46:09.542 main 收到响应: 你好 FunTester2022-05-09 20:46:0820:46:10.552 main 收到响应: 你好 FunTester2022-05-09 20:46:09
进程已结束,退出代码0
复制代码


比较简单,这里不多做介绍了。

newStub

看名字有点猜不出来,这是个纯异步调用客户端。写上去代码可能比较多,但是如果把io.grpc.stub.StreamObserver对象拆开看就会比较容易懂一些。代码如下:


package com.funtest.grpc
import com.funtester.frame.SourceCodeimport com.funtester.fungrpc.HelloRequestimport com.funtester.fungrpc.HelloResponseimport com.funtester.fungrpc.HelloServiceGrpcimport io.grpc.ManagedChannelimport io.grpc.ManagedChannelBuilderimport io.grpc.stub.StreamObserver
import java.util.concurrent.ExecutionException
class SyncClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080) .usePlaintext() .build()
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel).withCompression("gzip")
HelloRequest helloRequest = HelloRequest.newBuilder() .setName("FunTester") .build()

StreamObserver<HelloResponse> helloResponseStreamObserver = new StreamObserver<HelloResponse>() {
@Override void onNext(HelloResponse value) { output(value.getMsg()) }
@Override void onError(Throwable t) { output(t.getMessage()) }
@Override void onCompleted() {
} } 5.times {helloServiceStub.executeHi(helloRequest, helloResponseStreamObserver)} sleep(2000) managedChannel.shutdown() }
}
复制代码


由于都是异步,所以相当于自动多线程了。控制台输出如下:



20:50:59.053 main ###### # # # # ####### ###### ##### ####### ###### ##### # # # ## # # # # # # # # #### # # # # # # #### ##### # #### ##### # # # # # # # # # # # # # # ##### # # # ###### ##### # ###### # #
20:51:00.816 grpc-default-executor-4 你好 FunTester2022-05-09 20:50:5920:51:00.816 grpc-default-executor-1 你好 FunTester2022-05-09 20:50:5920:51:00.816 grpc-default-executor-3 你好 FunTester2022-05-09 20:50:5920:51:00.816 grpc-default-executor-0 你好 FunTester2022-05-09 20:50:5920:51:00.816 grpc-default-executor-2 你好 FunTester2022-05-09 20:50:59
进程已结束,退出代码0
复制代码


可以看到,所有请求响应的结果时间都是一样的,说明请求到达服务端时间是一样的。

newFutureStub

这种客户端也是异步的,之所以放在最后将是因为它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用。下面是演示代码:


package com.funtest.grpc
import com.funtester.frame.SourceCodeimport com.funtester.fungrpc.HelloRequestimport com.funtester.fungrpc.HelloResponseimport com.funtester.fungrpc.HelloServiceGrpcimport com.google.common.util.concurrent.ListenableFutureimport io.grpc.ManagedChannelimport io.grpc.ManagedChannelBuilder
import java.util.concurrent.ExecutionException
class FutureClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080) .usePlaintext() .build()
HelloServiceGrpc.HelloServiceFutureStub helloServiceFutureStub = HelloServiceGrpc.newFutureStub(managedChannel).withCompression("gzip") HelloRequest helloRequest = HelloRequest.newBuilder() .setName("FunTester") .build() //同步客户端的使用方式 5.times { ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest) HelloResponse helloResponse = helloResponseListenableFuture.get() output(helloResponse.getMsg()) } //异步客户端的使用方式 def res = [] 5.times { ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest) res << helloResponseListenableFuture }
res.each { output(it.get().getMsg()) }
managedChannel.shutdown() }
}
复制代码


控制台输出:


21:03:07.312 main   ###### #     #  #    # ####### ######  #####  ####### ######  #####  #      #     #  ##   #    #    #       #         #    #       #    #  ####   #     #  # #  #    #    ####    #####     #    ####    #####  #      #     #  #  # #    #    #            #    #    #       #   #  #       #####   #    #    #    ######  #####     #    ######  #    #
21:03:09.226 main 你好 FunTester2022-05-09 21:03:0821:03:10.232 main 你好 FunTester2022-05-09 21:03:0921:03:11.238 main 你好 FunTester2022-05-09 21:03:1021:03:12.247 main 你好 FunTester2022-05-09 21:03:1121:03:13.255 main 你好 FunTester2022-05-09 21:03:1221:03:14.262 main 你好 FunTester2022-05-09 21:03:1321:03:14.281 main 你好 FunTester2022-05-09 21:03:1321:03:14.281 main 你好 FunTester2022-05-09 21:03:1321:03:14.282 main 你好 FunTester2022-05-09 21:03:1321:03:14.282 main 你好 FunTester2022-05-09 21:03:13
进程已结束,退出代码0
复制代码


可以看到,前面五个请求是串行的,后面的五个请求是并行的。在实际工作中,使用到异步调用又要处理结果的地方也是这种类型使用较多,而使用 Java 的线程同步类,往往比较麻烦也不够优雅。

Have Fun ~ Tester !

发布于: 刚刚阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,750篇原创,欢迎关注 2020.10.20 加入

公众号FunTester,坚持原创文章的测试人,一个有趣的灵魂。

评论

发布
暂无评论
gRPC三种客户端类型实践【Java版】_FunTester_InfoQ写作社区