写点什么

实现 etcd 服务注册与发现

作者:六月的
  • 2022-10-19
    上海
  • 本文字数:7310 字

    阅读完需:约 1 分钟

转载自:实现etcd服务注册与发现

0.1、目录结构

.├── api│   └── main.go├── common│   └── common.go├── docker-compose.yml├── etcd│   └── Dockerfile├── go.mod├── go.sum├── rpc│   ├── courseware│   │   ├── courseware.pb.go│   │   └── courseware_grpc.pb.go│   ├── courseware.proto│   └── main.go└── server    ├── service_discovery.go    └── service_registration.go
复制代码

1、docker-compose 部署一个 3 节点的集群

项目根目录下创建 etcd 目录,并在目录下新增 Dockerfile 文件

FROM bitnami/etcd:latest
LABEL maintainer="liuyuede123 <liufutianoppo@163.com>"
复制代码

项目根目录下新增 docker-compose.yml

version: '3.5'# 网络配置networks:  backend:    driver: bridge
# 服务容器配置services: etcd1: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd1 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "12379:2379" - "12380:2380" networks: - backend restart: always
etcd2: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd2 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "22379:2379" - "22380:2380" networks: - backend restart: always
etcd3: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd3 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "32379:2379" - "32380:2380" networks: - backend restart: always
复制代码


相关参数概念:


  1. ETCD_INITIAL_ADVERTISE_PEER_URLS:该成员节点在整个集群中的通信地址列表,这个地址用来传输集群数据的地址。因此这个地址必须是可以连接集群中所有的成员的。

  2. ETCD_LISTEN_PEER_URLS:该节点与其他节点通信时所监听的地址列表,多个地址使用逗号隔开,其格式可以划分为 scheme://IP:PORT,这里的 scheme 可以是 http、https

  3. ETCD_LISTEN_CLIENT_URLS:该节点与客户端通信时监听的地址列表

  4. ETCD_ADVERTISE_CLIENT_URLS:广播给集群中其他成员自己的客户端地址列表

  5. ETCD_INITIAL_CLUSTER_TOKEN:初始化集群 token

  6. ETCD_INITIAL_CLUSTER:配置集群内部所有成员地址,其格式为:ETCD_NAME=ETCD_INITIAL_ADVERTISE_PEER_URLS,如果有多个使用逗号隔开

  7. ETCD_INITIAL_CLUSTER_STATE:初始化集群状态,new 表示新建

启动集群

docker-compose up -dCreating network "etcd_backend" with driver "bridge"Creating etcd_etcd1_1 ... doneCreating etcd_etcd2_1 ... doneCreating etcd_etcd3_1 ... done
复制代码

测试集群可用性

# 登录其中一个节点docker exec -it 5f97bf0b446f6e6514576fc1eb46c2f60d2c2b3e3f3ee3b1ad6219414fa915c8 /bin/sh# 写入一个键值etcdctl put name "liuyuede"OK# 查看etcdctl get namenameliuyuede
# 登录另外俩个节点docker exec -it a6ccc9b6e5cc81ee7c779e2b9e7235cd6d814e92fbc66b7e4846798acff8ee2a /bin/shetcdctl get namenameliuyuede
docker exec -it 6817fa89e3e9e422628e0049910b672df389c62d41bf2349a0f77e22c99e5270 /bin/shetcdctl get namenameliuyuede
复制代码


etcd 集群采用的是 raft 协议,一般至少为俩个集群,只有一个 master,如果删除到只剩一个节点当前节点也不能提供服务


查看集群情况


etcdctl --endpoints=http://0.0.0.0:12379,0.0.0.0:22379,0.0.0.0:32379 endpoint status --write-out=table
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+| http://0.0.0.0:12379 | ade526d28b1f92f7 | 3.5.4 | 20 kB | true | false | 3 | 13 | 13 | || 0.0.0.0:22379 | d282ac2ce600c1ce | 3.5.4 | 20 kB | false | false | 3 | 13 | 13 | || 0.0.0.0:32379 | bd388e7810915853 | 3.5.4 | 20 kB | false | false | 3 | 13 | 13 | |+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
复制代码

2、增加服务注册功能

服务注册的流程

  1. 向 etcd 新增一个包含 rpc 服务信息的键值对,并设置租约(比如 5 秒过期)

  2. 利用保活函数 KeepAlive 不断续约


package server
import ( "context" "encoding/json" "errors" clientv3 "go.etcd.io/etcd/client/v3" "time")
type ServiceInfo struct { Name string Ip string}
type Service struct { ServiceInfo ServiceInfo stop chan error leaseId clientv3.LeaseID client *clientv3.Client}
func NewService(serviceInfo ServiceInfo, endpoints []string) (service *Service, err error) { client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Second * 10, }) if err != nil { return nil, err }
service = &Service{ ServiceInfo: serviceInfo, client: client, } return}
func (s *Service) Start(ctx context.Context) (err error) { alive, err := s.KeepAlive(ctx) if err != nil { return }
for { select { case err = <-s.stop: // 服务端关闭返回错误 return err case <-s.client.Ctx().Done(): // etcd关闭 return errors.New("server closed") case _, ok := <-alive: if !ok { // 保活通道关闭 return s.revoke(ctx) } } }}
func (s *Service) KeepAlive(ctx context.Context) (<-chan *clientv3.LeaseKeepAliveResponse, error) { info := s.ServiceInfo key := s.getKey() val, _ := json.Marshal(info)
// 创建租约 leaseResp, err := s.client.Grant(ctx, 5) if err != nil { return nil, err }
// 写入etcd _, err = s.client.Put(ctx, key, string(val), clientv3.WithLease(leaseResp.ID)) if err != nil { return nil, err }
s.leaseId = leaseResp.ID return s.client.KeepAlive(ctx, leaseResp.ID)}
// 取消租约func (s *Service) revoke(ctx context.Context) error { _, err := s.client.Revoke(ctx, s.leaseId) return err}
func (s *Service) getKey() string { return s.ServiceInfo.Name + "/" + s.ServiceInfo.Ip}
复制代码

3、增加服务发现

服务发现流程


  1. 实现 grpc 中 resolver.Builder 接口的 Build 方法

  2. 通过 etcdclient 获取并监听 grpc 服务(是否有新增或者删除)

  3. 更新到 resolver.State,State 包含与 ClientConn 相关的当前 Resolver 状态,包括 grpc 的地址 resolver.Address


package server
import ( "context" "encoding/json" "fmt" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver")
type Discovery struct { endpoints []string service string client *clientv3.Client clientConn resolver.ClientConn}
func NewDiscovery(endpoints []string, service string) resolver.Builder { return &Discovery{ endpoints: endpoints, service: service, }}
func (d *Discovery) ResolveNow(rn resolver.ResolveNowOptions) {
}
func (d *Discovery) Close() {
}
func (d *Discovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error d.client, err = clientv3.New(clientv3.Config{ Endpoints: d.endpoints, }) if err != nil { return nil, err }
d.clientConn = cc
go d.watch(d.service)
return d, nil}
func (d *Discovery) Scheme() string { return "etcd"}
func (d *Discovery) watch(service string) { addrM := make(map[string]resolver.Address) state := resolver.State{}
update := func() { addrList := make([]resolver.Address, 0, len(addrM)) for _, address := range addrM { addrList = append(addrList, address) } state.Addresses = addrList err := d.clientConn.UpdateState(state) if err != nil { fmt.Println("更新地址出错:", err) } } resp, err := d.client.Get(context.Background(), service, clientv3.WithPrefix()) if err != nil { fmt.Println("获取地址出错:", err) } else { for i, kv := range resp.Kvs { info := &ServiceInfo{} err = json.Unmarshal(kv.Value, info) if err != nil { fmt.Println("解析value失败:", err) } addrM[string(resp.Kvs[i].Key)] = resolver.Address{ Addr: info.Ip, ServerName: info.Name, } } }
update()
dch := d.client.Watch(context.Background(), service, clientv3.WithPrefix(), clientv3.WithPrevKV()) for response := range dch { for _, event := range response.Events { switch event.Type { case mvccpb.PUT: info := &ServiceInfo{} err = json.Unmarshal(event.Kv.Value, info) if err != nil { fmt.Println("监听时解析value报错:", err) } else { addrM[string(event.Kv.Key)] = resolver.Address{Addr: info.Ip} } fmt.Println(string(event.Kv.Key)) case mvccpb.DELETE: delete(addrM, string(event.Kv.Key)) fmt.Println(string(event.Kv.Key)) } } update() }}
复制代码

4、grpc 课件服务

common 参数


package common
const CoursewareRpc = "rpc.courseware"
var Endpoints = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}
复制代码


生成课件服务 grpc


syntax = "proto3";
package rpc;option go_package = "./courseware";
message GetRequest { uint64 Id = 1;}message GetResponse { uint64 Id = 1; string Code = 2; string Name = 3; uint64 Type = 4;}

service Courseware { rpc Get(GetRequest) returns(GetResponse);}
复制代码


protoc --go_out=./ --go-grpc_out=./ courseware.proto
复制代码


课件服务入口


package main
import ( "context" "fmt" "go-demo/etcd/common" "go-demo/etcd/rpc/courseware" "go-demo/etcd/server" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "net" "os" "strings" "time")
var Port string
type service struct { courseware.UnsafeCoursewareServer}
func (s *service) Get(ctx context.Context, req *courseware.GetRequest) (res *courseware.GetResponse, err error) { fmt.Println("获取课件详情 port:", Port, " time:", time.Now()) return &courseware.GetResponse{ Id: 1, Code: "HD4544", Name: "多媒体课件", Type: 4, }, nil}func main() { args := os.Args[1:] if len(args) == 0 { panic("缺少port参数:port=8400") } for _, arg := range args { ports := strings.Split(arg, "=") if len(ports) < 2 || ports[0] != "port" { panic("port参数格式错误:port=8400") } Port = ports[1] } listen, err := net.Listen("tcp", ":"+Port) if err != nil { fmt.Println("failed to listen", err) return } s := grpc.NewServer() courseware.RegisterCoursewareServer(s, &service{})
reflection.Register(s)
// 注册到etcd newService, err := server.NewService(server.ServiceInfo{ Name: common.CoursewareRpc, Ip: "127.0.0.1:" + Port, }, common.Endpoints) if err != nil { fmt.Println("添加到etcd失败:", err) return }
go func() { err = newService.Start(context.Background()) if err != nil { fmt.Println("开启服务注册失败:", err) } }()
if err = s.Serve(listen); err != nil { fmt.Println("开启rpc服务失败:", err) }}
复制代码

5、api 服务

package main
import ( "context" "fmt" "go-demo/etcd/common" "go-demo/etcd/rpc/courseware" "go-demo/etcd/server" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" "time")
func main() { d := server.NewDiscovery(common.Endpoints, common.CoursewareRpc) resolver.Register(d)
for { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 通过etcd注册中心和grpc服务建立连接 conn, err := grpc.DialContext(ctx, fmt.Sprintf(d.Scheme()+":///"+common.CoursewareRpc), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), grpc.WithBlock(), ) if err != nil { fmt.Println("和rpc建立连接失败:", err) return }
client := courseware.NewCoursewareClient(conn) get, err := client.Get(ctx, &courseware.GetRequest{Id: 1}) if err != nil { fmt.Println("获取课件失败:", err) return }
fmt.Println(get)
time.Sleep(3 * time.Second) cancel() }
}
复制代码


6、测试


开启 3 个服务,可以看到客户端通过负载均衡随机到一个服务请求


go run main.go port=8400获取课件详情 port: 8400  time: 2022-08-25 18:47:43.784942 +0800 CST m=+78.228450885获取课件详情 port: 8400  time: 2022-08-25 18:47:52.925858 +0800 CST m=+87.369721731获取课件详情 port: 8400  time: 2022-08-25 18:48:02.001177 +0800 CST m=+96.445393312获取课件详情 port: 8400  time: 2022-08-25 18:48:05.060066 +0800 CST m=+99.504401028获取课件详情 port: 8400  time: 2022-08-25 18:48:14.154148 +0800 CST m=+108.598836458go run main.go port=8500获取课件详情 port: 8500  time: 2022-08-25 18:47:46.832479 +0800 CST m=+62.822399701获取课件详情 port: 8500  time: 2022-08-25 18:47:49.844536 +0800 CST m=+65.834573960获取课件详情 port: 8500  time: 2022-08-25 18:47:55.955638 +0800 CST m=+71.945912584获取课件详情 port: 8500  time: 2022-08-25 18:48:17.168293 +0800 CST m=+93.159391485获取课件详情 port: 8500  time: 2022-08-25 18:48:20.182787 +0800 CST m=+96.174002796go run main.go port=8600获取课件详情 port: 8600  time: 2022-08-25 18:47:58.968283 +0800 CST m=+1.317052360获取课件详情 port: 8600  time: 2022-08-25 18:48:08.106493 +0800 CST m=+10.455617422获取课件详情 port: 8600  time: 2022-08-25 18:48:11.125212 +0800 CST m=+13.474453269
复制代码


用户头像

六月的

关注

还未添加个人签名 2019-07-23 加入

还未添加个人简介

评论

发布
暂无评论
实现etcd服务注册与发现_Docker-compose_六月的_InfoQ写作社区