写点什么

java 版 gRPC 实战之六:客户端动态获取服务端地址

作者:程序员欣宸
  • 2022 年 3 月 30 日
  • 本文字数:7592 字

    阅读完需:约 25 分钟

java版gRPC实战之六:客户端动态获取服务端地址

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

客户端为什么要动态获取服务端地址

本文是《java 版 gRPC 实战》系列的第六篇,前面咱们在开发客户端应用时,所需的服务端地址都是按如下步骤设置的:


  • 在 application.yml 中配置,如下图:

  • 在用到 gRPC 的 bean 中,使用注解 GrpcClient 即可将 Stub 类注入到成员变量中:

  • 上述操作方式的优点是简单易用好配置,缺点也很明显:服务端的 IP 地址或者端口一旦有变化,就必须修改 application.yml 并重启客户端应用;

为什么不用注册中心

  • 您一定会想到解决上述问题最简单的方法就是使用注册中心,如 nacos、eureka 等,其实我也是这么想的,直到有一天,由于工作原因,我要在一个已有的 gRPC 微服务环境部署自己的应用,这个微服务环境并非 java 技术栈,而是基于 golang 的,他们都使用了 go-zero 框架( 老扎心了),这个 go-zero 框架没有提供 java 语言的 SDK,因此,我只能服从 go-zero 框架的规则,从 etcd 中取得其他微服务的地址信息,才能调用其他 gRPC 服务端,如下图所示:

  • 如此一来,咱们之前那种在 application.yml 中配置服务端信息的方法就用不上了,本篇咱们来开发一个新的 gRPC 客户端应用,满足以下需求:


  1. 创建 Stub 对象的时候,服务端的信息不再来自注解 GrpcClient,而是来自查询 etcd 的结果;

  2. etcd 上的服务端信息有变化的时候,客户端可以及时更新,而不用重启应用;

本篇概览

  • 本篇要开发名为 get-service-addr-from-etcd 的 springboot 应用,该应用从 etcd 取得 local-server 应用的 IP 和端口,然后调用 local-server 的 sayHello 接口,如下图:

  • local-server 应用是个简单的 gRPC 服务端,其详细信息请参考《java 版 gRPC 实战之二:服务发布和调用》

  • 本篇由以下章节组成:


  1. 开发客户端应用;

  2. 部署 gRPC 服务端应用;

  3. 部署 etcd;

  4. 模拟 go-zero 的规则,将服务端应用的 IP 地址和端口写入 etcd;

  5. 启动客户端应用,验证能否正常调用服务端的服务;

  6. 重启服务端,重启的时候修改端口;

  7. 修改 etcd 中服务端的端口信息;

  8. 调用接口触发客户端重新实例化 Stub 对象;

  9. 验证客户端能否正常调用修改了端口的服务端服务;

源码下载

  • 本篇实战中的完整源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):



  • 这个 git 项目中有多个文件夹,《java 版 gRPC 实战》系列的源码在 grpc-tutorials 文件夹下,如下图红框所示:

  • grpc-tutorials 文件夹下有多个目录,本篇文章对应的客户端代码在 get-service-addr-from-etcd 目录下,如下图:

开发客户端应用

  • 在父工程的 build.gradle 文件中新增一行,这是 etcd 相关的库,如下图红框所示:


  • 在父工程 grpc-turtorials 下面新建名为 get-service-addr-from-etcd 的模块,其 build.gradle 内容如下:


plugins {    id 'org.springframework.boot'}
dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation 'io.etcd:jetcd-core' implementation project(':grpc-lib')}
复制代码


  • 配置文件 application.yml,设置自己的 web 端口号和应用名,另外 grpc.etcdendpoints 是 etcd 集群的地址信息:


server:  port: 8084spring:  application:    name: get-service-addr-from-etcd
grpc: # etcd的地址,从此处取得gRPC服务端的IP和端口 etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
复制代码


  • 启动类 DynamicServerAddressDemoApplication.java 的代码就不贴了,普通的 springboot 启动类而已;

  • 新增 StubWrapper.java 文件,这是个 spring bean,要重点关注的是 simpleBlockingStub 方法,当 bean 在 spring 注册的时候 simpleBlockingStub 方法会被执行,这样每当 bean 在 spring 注册时,都会从 etcd 查询 gRPC 服务端信息,然后创建 SimpleBlockingStub 对象:


package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;import io.etcd.jetcd.ByteSequence;import io.etcd.jetcd.Client;import io.etcd.jetcd.KV;import io.etcd.jetcd.kv.GetResponse;import io.grpc.Channel;import io.grpc.ManagedChannelBuilder;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.Arrays;import static com.google.common.base.Charsets.UTF_8;
/** * @author will (zq2599@gmail.com) * @version 1.0 * @description: 包装了SimpleBlockingStub实例的类,发起gRPC请求时需要用到SimpleBlockingStub实例 * @date 2021/5/8 19:34 */@Component("stubWrapper")@Data@Slf4j@ConfigurationProperties(prefix = "grpc")public class StubWrapper {
/** * 这是etcd中的一个key,该key对应的值是grpc服务端的地址信息 */ private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server";
/** * 配置文件中写好的etcd地址 */ private String etcdendpoints;
private SimpleGrpc.SimpleBlockingStub simpleBlockingStub;
/** * 从etcd查询gRPC服务端的地址 * @return */ public String[] getGrpcServerInfo() { // 创建client类 KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient();
GetResponse response = null;
// 去etcd查询/grpc/local-server这个key的值 try { response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get(); } catch (Exception exception) { log.error("get grpc key from etcd error", exception); }
if (null==response || response.getKvs().isEmpty()) { log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY); return null; }
// 从response中取得值 String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8);
// rawAddrInfo是“192.169.0.1:8080”这样的字符串,即一个IP和一个端口,用":"分割, // 这里用":"分割成数组返回 return null==rawAddrInfo ? null : rawAddrInfo.split(":"); }
/** * 每次注册bean都会执行的方法, * 该方法从etcd取得gRPC服务端地址, * 用于实例化成员变量SimpleBlockingStub */ @PostConstruct public void simpleBlockingStub() { // 从etcd获取地址信息 String[] array = getGrpcServerInfo();
log.info("create stub bean, array info from etcd {}", Arrays.toString(array));
// 数组的第一个元素是gRPC服务端的IP地址,第二个元素是端口 if (null==array || array.length<2) { log.error("can not get valid grpc address from etcd"); return; }
// 数组的第一个元素是gRPC服务端的IP地址 String addr = array[0]; // 数组的第二个元素是端口 int port = Integer.parseInt(array[1]);
// 根据刚才获取的gRPC服务端的地址和端口,创建channel Channel channel = ManagedChannelBuilder .forAddress(addr, port) .usePlaintext() .build();
// 根据channel创建stub simpleBlockingStub = SimpleGrpc.newBlockingStub(channel); }}
复制代码


  • GrpcClientService 是封装了 StubWrapper 的服务类:


package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.HelloReply;import com.bolingcavalry.grpctutorials.lib.HelloRequest;import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;import io.grpc.StatusRuntimeException;import lombok.Setter;import net.devh.boot.grpc.client.inject.GrpcClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;
@Servicepublic class GrpcClientService {
@Autowired(required = false) @Setter private StubWrapper stubWrapper;
public String sendMessage(final String name) { // 很有可能simpleStub对象为null if (null==stubWrapper) { return "invalid SimpleBlockingStub, please check etcd configuration"; }
try { final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build()); return response.getMessage(); } catch (final StatusRuntimeException e) { return "FAILED with " + e.getStatus().getCode().name(); } }}
复制代码


  • 新增一个 controller 类 GrpcClientController,提供一个 http 接口,里面会调用 GrpcClientService 的方法,最终完成远程 gRPC 调用:


package com.bolingcavalry.dynamicrpcaddr;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
@RestControllerpublic class GrpcClientController {
@Autowired private GrpcClientService grpcClientService;
@RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "will") String name) { return grpcClientService.sendMessage(name); }}
复制代码


  • 接下来新增一个 controller 类 RefreshStubInstanceController,对外提供一个 http 接口 refreshstub,作用是删掉 stubWrapper 这个 bean,再重新注册一次,这样每当外部调用 refreshstub 接口,就可以从 etcd 取得服务端信息再重新实例化 SimpleBlockingStub 成员变量,这样就达到了客户端动态获取服务端地址的效果:


package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;import org.springframework.beans.BeansException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.support.AbstractBeanDefinition;import org.springframework.beans.factory.support.BeanDefinitionBuilder;import org.springframework.beans.factory.support.BeanDefinitionRegistry;import org.springframework.beans.factory.support.DefaultListableBeanFactory;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
@RestControllerpublic class RefreshStubInstanceController implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Autowired private GrpcClientService grpcClientService;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }
@RequestMapping("/refreshstub") public String refreshstub() {
String beanName = "stubWrapper";
//获取BeanFactory DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
// 删除已有bean defaultListableBeanFactory.removeBeanDefinition(beanName);
//创建bean信息. BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class);
//动态注册bean. defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
// 更新引用关系(注意,applicationContext.getBean方法很重要,会触发StubWrapper实例化操作) grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class));
return "Refresh success"; }}
复制代码


  • 编码完成,开始验证;

部署 gRPC 服务端应用

部署 gRPC 服务端应用很简单,启动 local-server 应用即可:


部署 etcd

  • 为了简化操作,我这里的 etcd 集群是用 docker 部署的,对应的 docker-compose.yml 文件内容如下:


version: '3'services:  etcd1:    image: "quay.io/coreos/etcd:v3.4.7"    entrypoint: /usr/local/bin/etcd    command:      - '--name=etcd1'      - '--data-dir=/etcd_data'      - '--initial-advertise-peer-urls=http://etcd1:2380'      - '--listen-peer-urls=http://0.0.0.0:2380'      - '--listen-client-urls=http://0.0.0.0:2379'      - '--advertise-client-urls=http://etcd1:2379'      - '--initial-cluster-token=etcd-cluster'      - '--heartbeat-interval=250'      - '--election-timeout=1250'      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'      - '--initial-cluster-state=new'    ports:      - 2379:2379    volumes:      - ./store/etcd1/data:/etcd_data  etcd2:    image: "quay.io/coreos/etcd:v3.4.7"    entrypoint: /usr/local/bin/etcd    command:      - '--name=etcd2'      - '--data-dir=/etcd_data'      - '--initial-advertise-peer-urls=http://etcd2:2380'      - '--listen-peer-urls=http://0.0.0.0:2380'      - '--listen-client-urls=http://0.0.0.0:2379'      - '--advertise-client-urls=http://etcd2:2379'      - '--initial-cluster-token=etcd-cluster'      - '--heartbeat-interval=250'      - '--election-timeout=1250'      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'      - '--initial-cluster-state=new'    ports:      - 2380:2379    volumes:      - ./store/etcd2/data:/etcd_data  etcd3:    image: "quay.io/coreos/etcd:v3.4.7"    entrypoint: /usr/local/bin/etcd    command:      - '--name=etcd3'      - '--data-dir=/etcd_data'      - '--initial-advertise-peer-urls=http://etcd3:2380'      - '--listen-peer-urls=http://0.0.0.0:2380'      - '--listen-client-urls=http://0.0.0.0:2379'      - '--advertise-client-urls=http://etcd3:2379'      - '--initial-cluster-token=etcd-cluster'      - '--heartbeat-interval=250'      - '--election-timeout=1250'      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'      - '--initial-cluster-state=new'    ports:      - 2381:2379    volumes:      - ./store/etcd3/data:/etcd_data
复制代码


  • 准备好上述文件后,执行 docker-compose up -d 即可创建集群;

将服务端应用的 IP 地址和端口写入 etcd

  • 我这边 local-server 所在服务器 IP 是 192.168.50.5,端口 9898,所以执行以下命令将 local-server 信息写入 etcd:


docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898
复制代码

启动客户端应用

  • 打开 DynamicServerAddressDemoApplication.java,点击下图红框位置,即可启动客户端应用:

  • 注意下图红框中的日志,该日志证明客户端应用从 etcd 获取服务端信息成功:

  • 浏览器访问应用 get-service-addr-from-etcd 的 http 接口,成功收到响应,证明 gRPC 调用成功:

  • 去看 local-server 的控制台,如下图红框,证明远程调用确实执行了:

重启服务端,重启的时候修改端口

  • 为了验证动态获取服务端信息是否有效,咱们先把 local-server 应用的端口改一下,如下图红框,改成 9899

  • 改完重启 local-server,如下图红框,可见 gRPC 端口已经改为 9899:

  • 这时候再访问 get-service-addr-from-etcd 的 http 接口,由于 get-service-addr-from-etcd 不知道 local-server 的监听端口发生了改变,因此还是去访问 9898 端口,毫无意外的返回了失败:

修改 etcd 中服务端的端口信息

现在执行以下命令,将 etcd 中的服务端信息改为正确的:


docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899
复制代码

调用接口触发客户端重新实例化 Stub 对象

  • 聪明的您一定知道接下来要做的事情了:让 StubWrapper 的 bean 重新在 spring 环境注册,也就是调用 RefreshStubInstanceController 提供的 http 接口 refreshstub:

  • 查看 get-service-addr-from-etcd 应用的控制台,如下图红框,StubWrapper 已经重新注册了,并且从 etcd 取得了最新的服务端信息:

验证客户端能否正常调用修改了端口的服务端服务

  • 再次访问 get-service-addr-from-etcd 应用的 web 接口,如下图,gRPC 调用成功:

  • 至此,在不修改配置不重启服务的情况下,客户端也可以适应服务端的变化了,当然了,本文只是提供基本的操作参考,实际上的微服务环境会更复杂,例如 refreshstub 接口可能被其他服务调用,这样服务端有了变化可以更加及时地被更新,还有客户端本身也肯能是 gRPC 服务提供方,那也要把自己注册到 etcd 上去,还有利用 etcd 的 watch 功能监控指定的服务端是否一直存活,以及同一个 gRPC 服务的多个实例如何做负载均衡,等等,这些都要根据您的实际情况来定制;

  • 本篇内容过多,可见对于这些官方不支持的微服务环境,咱们自己去做注册发现的适配很费时费力的,如果设计和选型能自己做主,我们更倾向于使用现成的注册中心,接下来的文章,咱们就一起尝试使用 eureka 为 gRPC 提供注册发现服务;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 2
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
java版gRPC实战之六:客户端动态获取服务端地址_gRPC_程序员欣宸_InfoQ写作平台