写点什么

浅谈 etcd 服务注册与发现

作者:Barry Yan
  • 2022-10-15
    北京
  • 本文字数:5522 字

    阅读完需:约 1 分钟

浅谈etcd服务注册与发现

Hello 朋友们,在之前参加云原生活动的时候曾写过一篇文章《浅谈云原生技术组件—etcd》,在其中我主要说明了 etcd 在基于 Kubernetes 云原生微服务框架中的定位,主要是用来做服务的远程配置、KV 存储等等,那么今天就来简要的补充讲解下 etcd 的另一个重要的作用——服务注册和发现,没错,正是和 Zookeeper、Eureka、Consul 等拥有一样角色的开源微服务组件,且毫不逊色于这些,那么我们就开始进行讲解。

1 基于 etcd 的服务注册与发现逻辑架构

1.1 服务注册中心抽象


(图片来自网络)


  • Service Registry(服务注册表,通常也成为服务注册中心):内部拥有一个数据结构,用于存储已发布服务的配置信息。注册中心的作用一句话概括就是存放和调度服务的配置,实现服务和注册中心,服务和服务之间的相互通信,可以说是微服务中的”通讯录“,它记录了服务和服务地址的映射关系。

  • Service Requestor(服务调用者):根据服务注册中心调用已有服务。

  • Service Provider(服务提供者):提供服务到服务注册中心。

1.2 etcd 服务注册发现简易版

2 代码实现

2.1 总体流程

服务提供者


(1)监听网络


(2)创建 gRPC 服务端,并将具体的服务进行注册


(3)利用服务地址、服务名等注册 etcd 服务配置


(4)gRPC 监听服务


服务消费者


(1)注册 etcd 解析器


(2)连接 etcd 服务


(3)获取 gRPC 客户端


(4)调用 gRPC 服务

2.2 代码

2.2.1 服务提供方
var (   cli         *clientv3.Client   Schema      = "ns"   Host        = "127.0.0.1"   Port        = 3000              //端口   ServiceName = "api_log_service" //服务名称   EtcdAddr    = "127.0.0.1:2379"  //etcd地址)
type ApiLogServer struct{}
func (api *ApiLogServer) GetApiLogByUid(ctx context.Context, req *proto.ApiLogRequest) (*proto.ApiLogResponse, error) { resp := &proto.ApiLogResponse{ Msg: "ok", Data: "Hello", } return resp, nil}
//将服务地址注册到etcd中func register(etcdAddr, serviceName, serverAddr string, ttl int64) error { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ";"), DialTimeout: 50 * time.Second, }) if err != nil { fmt.Printf("connection server err : %s\n", err) return err } } //与etcd建立长连接,并保证连接不断(心跳检测) ticker := time.NewTicker(time.Second * time.Duration(ttl)) go func() { key := "/" + Schema + "/" + serviceName + "/" + serverAddr for { resp, err := cli.Get(context.Background(), key) if err != nil { fmt.Printf("get server address err : %s", err) } else if resp.Count == 0 { //尚未注册 err = keepAlive(serviceName, serverAddr, ttl) if err != nil { fmt.Printf("keepAlive err : %s", err) } } <-ticker.C } }() return nil}
//保持服务器与etcd的长连接func keepAlive(serviceName, serverAddr string, ttl int64) error { //创建租约 leaseResp, err := cli.Grant(context.Background(), ttl) if err != nil { fmt.Printf("create grant err : %s\n", err) return err } //将服务地址注册到etcd中 key := "/" + Schema + "/" + serviceName + "/" + serverAddr _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Printf("register service err : %s", err) return err } //建立长连接 ch, err := cli.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Printf("KeepAlive err : %s\n", err) return err } //清空keepAlive返回的channel go func() { for { <-ch } }() return nil}
//取消注册func unRegister(serviceName, serverAddr string) { if cli != nil { key := "/" + Schema + "/" + serviceName + "/" + serverAddr cli.Delete(context.Background(), key) }}
func RunApiLog() { //监听网络 listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", Port)) if err != nil { fmt.Println("Listen network err :", err) return } defer listener.Close() //创建grpc srv := grpc.NewServer() defer srv.GracefulStop() //注册到grpc服务中 proto.RegisterApiLogServiceServer(srv, &ApiLogServer{}) //将服务地址注册到etcd中 serverAddr := fmt.Sprintf("%s:%d", Host, Port) fmt.Printf("rpc server address: %s\n", serverAddr) register(EtcdAddr, ServiceName, serverAddr, 10) //关闭信号处理 ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch unRegister(ServiceName, serverAddr) if i, ok := s.(syscall.Signal); ok { os.Exit(int(i)) } else { os.Exit(0) } }() //监听服务 err = srv.Serve(listener) if err != nil { fmt.Println("rpc server err : ", err) return }}
复制代码
2.2.2 服务消费方
var (   cli         *clientv3.Client   Schema      = "ns"   ServiceName = "api_log_service" //服务名称   EtcdAddr    = "127.0.0.1:2379"  //etcd地址)
type EtcdResolver struct { etcdAddr string clientConn resolver.ClientConn}
func NewEtcdResolver(etcdAddr string) resolver.Builder { return &EtcdResolver{etcdAddr: etcdAddr}}
func (r *EtcdResolver) Scheme() string { return Schema}
//ResolveNow watch有变化调用func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { fmt.Println(rn)}
//Close 解析器关闭时调用func (r *EtcdResolver) Close() { fmt.Println("Close")}
//Build 构建解析器 grpc.Dial()时调用func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error //构建etcd client if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("connect etcd err : %s\n", err) return nil, err } } r.clientConn = clientConn go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/") return r, nil}
//watch机制:监听etcd中某个key前缀的服务地址列表的变化func (r *EtcdResolver) watch(keyPrefix string) { //初始化服务地址列表 var addrList []resolver.Address resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("get service list err : ", err) } else { for i := range resp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) } } r.clientConn.NewAddress(addrList) //监听服务地址列表的变化 rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exists(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.clientConn.NewAddress(addrList) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.clientConn.NewAddress(addrList) } } } }}
func exists(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false}
func RunClient() { //注册etcd解析器 r := NewEtcdResolver(EtcdAddr) resolver.Register(r) //连接服务器,同步调用r.Build() conn, err := grpc.Dial(r.Scheme()+"://author/"+ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure()) if err != nil { fmt.Printf("connect err : %s", err) } defer conn.Close() //获得gRPC客户端 c := proto.NewApiLogServiceClient(conn) //调用服务 resp, err := c.GetApiLogByUid( context.Background(), &proto.ApiLogRequest{UId: 0}, ) if err != nil { fmt.Printf("call service err : %s", err) return } fmt.Printf("resp : %s , data : %s", resp.Msg, resp.Data)}
复制代码
2.2.3 公共组件
syntax = "proto3";  package proto; 
option go_package = "../api_log";
service ApiLogService { rpc GetApiLogByUid(ApiLogRequest) returns (ApiLogResponse){}}
message ApiLogRequest{ int32 u_id = 1;}
message ApiLogResponse{ int64 code = 1; string msg = 2; int64 count = 3; string data = 4;}
复制代码


注意要在编译后进行使用哈

2.3 注意事项

在我编写代码进行实现的过程中遇到过种种问题,但是最让人记忆深刻的就是 etcd 与 gRPC 版本不兼容的问题,用了很长时间才搞定,在这里记录下吧:


原因是 etcd3.x 版本不支持 grpc1.27 版本以上,但是 grpc1.27 以下编译成的中间代码又不支持新版本的 proto buffer,这就陷入了一个两难的处境,最后通过 Stack Overflow 才查到:


https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc


解决,在 go.mod 中加入这几行代码:


replace (   github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible   google.golang.org/grpc => google.golang.org/grpc v1.27.0)
复制代码

3 细节剖析

3.1 服务生产端 keepAlive

keepAlive 是一个老生常谈的问题了,下到 TCP/IP、HTTP 连接,上到 Redis 集群、MySQL 集群,都会有该机制,那么 etcd 的 keepAlive 是怎么搞的呢?


下面我们来看下


etcd 使用 LeaseKeepAlive API 调用创建的双向流来刷新租约。当客户端希望刷新租约时,它通过流发送一个 leasekeepaliverrequest:


message LeaseKeepAliveRequest {  int64 ID = 1;}
复制代码


  • ID :keepAlive 有效的租约 ID。


LeaseKeepAliveResponse作为 keepAlive 的响应:


message LeaseKeepAliveResponse {  ResponseHeader header = 1;  int64 ID = 2;  int64 TTL = 3;}
复制代码


  • ID :用新的 TTL 刷新的租约。

  • TTL :新的生存时间,以秒为单位,租约剩余的时间。

3.2 服务消费端 watch 机制

Watch API 提供了一个基于事件的接口,用于异步监视服务 key 的更改。etcd3 watch 通过持续观察给定的修订(当前的或历史的)来等待键的更改,并将键更新流回客户端。


对每个键的每次更改都用“Event”消息表示。Event 消息提供了更新的数据和更新的类型:


message Event {  enum EventType {    PUT = 0;    DELETE = 1;  }  EventType type = 1;  KeyValue kv = 2;  KeyValue prev_kv = 3;}
复制代码


  • type:PUT 类型表示新数据的更新,DELETE 表示 key 的删除。

  • kv:与事件相关的键值 PUT 事件包含 kv。

  • prev_kv:事件发生前修改版本的密钥的键值对。为了节省带宽,它只在 watch 显式启用的情况下填写。


watch 流:


watch 是长时间运行的请求,并使用 gRPC 流来流化事件数据。watch 流是双向的;客户端写入流来建立监视,读取流来接收监视事件。通过使用每个 watch 标识符来标记事件,单个 watch 流可以将多个不同的手表组合在一起。这种多路复用有助于减少核心 etcd 集群上的内存占用和连接开销。

4 总结

微服务是当今互联网领域的广泛概念,也是一种架构演进的结果,微服务的存在让架构设计更加的解耦合,让人员的分工更加明确,当然他的落地实现也并不止步与某一两种方式,在云原生领域的 Kubernetes+etcd,互联网领域常用的 Spring Cloud 全家桶以及 Dubbo 等都是微服务的具体实现,而 etcd 也仅仅是微服务中服务注册中心组件角色的一个代表而已。


参考:


https://etcd.io/docs/v3.5/dev-guide/grpc_naming/


https://www.jianshu.com/p/217d0e3a8d0f


https://www.cnblogs.com/wujuntian/p/12838041.html


https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc


https://blog.csdn.net/fly910905/article/details/103545120

发布于: 刚刚阅读数: 5
用户头像

Barry Yan

关注

做兴趣使然的Hero 2021-01-14 加入

Just do it.

评论

发布
暂无评论
浅谈etcd服务注册与发现_Go_Barry Yan_InfoQ写作社区