写点什么

快速读懂 Etcd

作者:Quincy
  • 2023-09-27
    湖南
  • 本文字数:12655 字

    阅读完需:约 42 分钟

快速读懂Etcd

前言

Etcd 是个常用的中间件,网上文章也很丰富,本人也在阅读官网、源码、以及网上文章后,有些自己理解,本文尽量以通俗方式讲解 Etcd,让读者能从宏观角度明白 ETCD 的基本原理,便于深入其源码分析。(本文将在后续陆续完善细节)

ETCD 概念

Etcd 是一款分布式存储结构存储应用,主要用于 KV 数据结构的存储,能够容忍单点故障,应对网络分区。而其之所以可以称作为分布式调度的核心,在于充分运用了 Raft 算法,当把 ETCD 的 Key-Value(KV)的数据结构存储到 Raft 的日志中,使得每个节点都有完整的数据,即自然可完成集群当中的分布式同步的实现。其整体结构如下:

Etcd 的安装也非常简单,由于是 go 语言编写,可以下载源码,go build 成二进制文件安装,或者直接下载其二进制执文件,修改配置文件,运行即可。Etcd 集群中的专业术语如下:

Raft: etcd 所采用的保证分布式系统强一致的算法,分为三部分,Leader 选举,日志复制和安全性;

Node: 一个 Raft 状态实例;

Member: 一个 etcd 实例,管理一个 Node,可以为客户端请求提供服务;

Cluster: 多个 Member 构成的可以协同工作的 etcd 集群;

Peer: 同一个集群中,其他 Member 的称呼;

Client: 向 etcd 集群发送 HTTP 请求的客户端;

WAL: 预写日志,是 etcd 用于持久化存储的日志格式;

Snapshot: etcd 防止 WAL 文件过多而设置的快照,存储 etcd 数据状态;

Proxy: etcd 的一种模式,可以为 etcd 提供反向代理服务;

Leader: Raft 算法中通过竞选而产生的处理所有数据提交的节点;

Follower: Raft 算法中竞选失败的节点,作为从属节点,为算法提供强一致性保证;

Candidate: Follower 超过一定时间接收不到 Leader 节点的心跳的时候,会转变为 Candidate(候选者)开始 Leader 竞选;

Term: 某个节点称为 Leader 到下一次竞选开始的时间周期,称为 Term(任界,任期);

Index: 数据项编号, Raft 中通过 Term 和 Index 来定位数据;

ETCD 初始化

Etcd 启动主要工作有这些方面:配置文件加载,参数校验、初始化监听器、初始化集群信息(初始化内存、加载快照和 WAL、启动监听服务、初始化 raft 对象、加入 raft 集群,启动 raftNode 开始监听服务,初始化用于发送 raft 信息的 http transport 结构体对象)、启动 EtcdServer、开始选举。

Etcd 初始化代码入口为:github.com\coreos\etcd\embed\etcd.go,入口方法为 StartEtcd(inCfg Config)(e Etcd, err error),包括了整个 etcd 启动流程。

func StartEtcd(inCfg *Config) (e *Etcd, err error) {	//开始校验配置	if err = inCfg.Validate(); err != nil {		return nil, err	}	serving := false	e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}	cfg := &e.cfg  defer func() {		if e == nil || err == nil {			return		}		if !serving {			// errored before starting gRPC server for serveCtx.serversC			for _, sctx := range e.sctxs {				close(sctx.serversC)			}		}		e.Close()		e = nil	}()  //用配置初始化PeerListener同伴管道监听  if e.Peers, err = configurePeerListeners(cfg); err != nil {		return e, err	}   //用配置初始化客户端监听器	if e.sctxs, err = configureClientListeners(cfg); err != nil {		return e, err	}	//将client url 汇聚成一个切片	for _, sctx := range e.sctxs {		e.Clients = append(e.Clients, sctx.l)	}  var (		urlsmap types.URLsMap		token   string	)	memberInitialized := true  //判断是否已经初始化成员	if !isMemberInitialized(cfg) {		memberInitialized = false		urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")		if err != nil {			return e, fmt.Errorf("error setting up initial cluster: %v", err)		}	}
// AutoCompactionRetention defaults to "0" if not set. if len(cfg.AutoCompactionRetention) == 0 { cfg.AutoCompactionRetention = "0" } // 由于ETCD数据存储多版本数据,随着写入的主键增加历史版本需要定时清理,  // 默认的历史数据是不会清理的,数据达到2G就不能写入,必须要清理压缩历史数据才能继续写入 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention) if err != nil { return e, err } // 解析Bolt存储类型 backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) // 定义配置 srvcfg := etcdserver.ServerConfig{ //.... } // 根据配置创建Etcd Server if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return e, err }
// buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// newly started member ("memberInitialized==false") // does not need corruption check if memberInitialized { if err = e.Server.CheckInitialHashKV(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) e.Server = nil return e, err } } // 启动Etcd服务端 e.Server.Start() // 启动peers监听 if err = e.servePeers(); err != nil { return e, err } // 启动客户端监听 if err = e.serveClients(); err != nil { return e, err } // 启动Prometheus 监控指标 if err = e.serveMetrics(); err != nil { return e, err } // 设置serving 为true代表启动完成 serving = true return e, nil}
复制代码

启动初始化

通过上面入口方法的源码,我们可以分析处,启动 etcd 时,流程大概做了下面几件事:

  • 加载配置

  • 参数校验,方法入口 inCfg.Validate(),其中采用 checkBindURLs(cfg.xxx), 该方法主要校验 peer-urls schem 、client-urls schem 、metrics、地址等相关信息, 且在 3.1 版本之后不允许使用域名作为 url 来进行绑定操作。

  • 初始化 PeerListeners,ClientListeners 等监听器,用于监听 peers 间及 client 端发送的 http 请求。

  • PeerListeners:作为 etcd member 之间进行通信使用的 listeners,为了性能考量,建议内部试用 schema:http,由 flag "listen-peer-urls"确定

  • ClientListeners:作为接受外部请求的 listerners,一般为了安全性考量,一般使用 schema:https,由 flag "listen-client-urls"确定

  • 处理 MVCC 数据存储历史版本

启动 ETCD

启动 etcd 的方法,e.Server.Start(),这个函数主要做了下面这些操作:

  • 获取初始化后的 PeerURLsMap 以及 cluster token,用于集群通信

  • 根据上述信息生成 new etcdServer 所需的的 ServerConfig 结构体

  • 根据封装后的结构体,调用 NewServer(cfg ServerConfig)初始化 etcdServer

newServer 方法主要做下面几件事:

  1. 检测是否有 WAL 日志

  2. 加载快照文件

  3. 根据 WAL 是否存在,flag "initial-cluster-state" 的状态,分别启动当前节点,分为三种情况:

  • 没有 WAL,不是新的集群,说明加入已有的集群

  • 没有 WAL,新的集群

  • 有 WAL,是否强制成为一个新的集群

这每种情况,都会启动一个 startNode 方法来启动 node,该启动节点方法主要是初始化 raft 节点,加入 raft 集群,做了下面几个步骤:

  • newRaft(): 初始化 Raft 对象,所有关系 Raft 协议执行周期内的事项都被包装到了 Raft 对象中

  • becomeFollower(): 初始化节点身份为 Follower

  • 追加配置变更的日志记录(和 Message 一样也属于一种 Entry)到 raft 的 raftLog 中

  • 将获取到的初始化的日志标记为提交状态

  • newNode(): 构造节点对象(Node),这个过程会初始化一个定时器 r.ticker.C

  • n.run(raft): 通过一个 goroutine 启动,主要是监听作用

  1. 封装 etcd Server 结构体基本信息,返回 etcdServer 对象

  2. 封装用于 peer 间发送及接收 raft 消息的 rafthttp transport

  3. 启动 etcdServer,同时为每个 client 的 url 、peer 的 url 启动一个 client sever 的 goroutine,以提供监听服务,这个动作在 raft http transport 后,发送 transport 开始通信。

  4. 启动后,整个节点就开始节点选举阶段。初始化时,每个选举都有个随机的选举定时器,先执行完成,就会进行通信,开始竞选 leader 流程。

问题

集群第一次初始化时,出于兼容考虑,默认采用的 API 版本是 2 系列,当集群中的所有节点都加入了集群,确认所有节点都支持 v3 接口时,才提升集群版本到 v3。

当初始化时,有一个节点启动失败,通过 v3 的接口访问,会报告 Error: Etcdserver: not capable 错误。这个只有第一次初始化集群的时候会遇到,如果集群已经初始化完毕,再挂掉节点,或者集群关闭重启(关闭重启的时候会从持久化数据中加载集群 API 版本),都不会有影响。

ETCD 工作流程

一般 etcd 由多个节点组成,集群中至少有 2N+1 个节点,因为构造一个集群需要奇数个节点,这样才可能构造成大多数,5 个节点大多数是 3 个,3 个节点大多数是 2 个,依次推算。现在好比有 3 个节点,其中大多数占 2 个,如下:


整个模型可以分为三个阶段。第一个阶段,调用者将写入请求发送给 leader,leader 会把日志向 follower 做实时的复制,也就是说调用者向 leader 请求写入一个 key-value 存储数据,leader 不会立即反馈响应给调度者,而是将请求进行复制传播。

第二个阶段,当日志复制给 N+1 个节点后,即可进行本地提交,并向客户端做返回应答,通知调度者写入成功。

实时日志复制会有一个性能影响,需要等待 follower 复制结束,所以写入性能比较差,大约每秒写入 1000 次。

第三阶段,当 leader 完成提交后,会周期性地把自己的提交信息周期性告诉给所有的 follower,这个通知异步完成,从而并不会影响性能,这样所有的 follower 也会完成各自的本地提交。

分布式调度系统的核心在于 etcd 的功能,需要结合 Raft 算法来进行状态的存储,当一个客户端向 leader 进行写入请求时,leader 先将提交的状态进行本地日志的填写,然后将数据存储到本地的 KV 引擎中,因为 etcd 运用的是 raft 的日志来做分布式 Key-Value 存储状态,当状态机将信息提交后,是需要把当前真实的状态数据写入到 Key-Value 的实际引擎中去生效。

ETCD 分布式协议 Raft

Raft 协议

Raft 是一个分布式协议,用于强一致性的集群日志同步算法,其本身可以看作是进行不断地写日志操作,可以分为主从两部分,日志在主节点,即 leader 上生成,然后同步到从节点,即 follower 中。Leader 的任职周期由 term 决定,term 即为任期,每一任的 leader 的任职周期 term 是递增的。日志格式如下:


图中我们假定总共五个节点,其中一个节点为 leader,其余四个为 follower,即一个主四个从,日志是通过请求顺序往后追加的,方块中的数字代表着当前 leader 的任职周期,即该数据是第几任 leader 所提交的数据。五个节点的大多数集群是 3 个节点,从中我们可以看到 1 到 7 的日志已经被复制到了 1、3、4 三个节点上,所以 1 到 7 的日志是一定可以被提交的,且不会被丢失。而其余节点即使没复制完全,仍不会影响本地的提交。

关于 Raft 异常安全,选举 leader 需要半数以上的节点参与,节点 commit 日志最多的允许选举为 leader,commit 日志同样多,则 term、index 越大的允许为 leader。

竞选流程

Etcd 竞选流程是 raft 协议实现的最佳实践,如果集群中的某一个 Follower 节点长时间内没有收到来自 Leader 的心跳请求,当前节点就会通过 MsgHup 消息进入预选举或者选举的流程。

func (r *raft) Step(m pb.Message) error {  // ...	switch m.Type {	case pb.MsgHup:		if r.state != StateLeader {			if r.preVote {				r.campaign(campaignPreElection)			} else {				r.campaign(campaignElection)			}		} else {			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)		}	}  // ...  return nil}
复制代码

如果收到 MsgHup 消息的节点不是 Leader 状态,就会根据当前集群的配置选择进入 PreElection 或者 Election 阶段,PreElection 阶段并不会真正增加当前节点的 Term,它的主要作用是得到当前集群能否成功选举出一个 Leader 的答案,如果当前集群中只有两个节点而且没有预选举阶段,那么这两个节点的 Term 会无休止的增加,预选举阶段就是为了解决这一问题而出现的。我们看其 campaign 方法,如下:

func (r *raft) campaign(t CampaignType) {	r.becomeCandidate()		if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {		r.becomeLeader()		return	}	for id := range r.prs {		if id == r.id {			continue		}
r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) }}
复制代码

当前节点会立刻调用 becomeCandidate 将当前节点的 Raft 状态变成候选人;在这之后,它会将票投给自己,如果当前集群只有一个节点,该节点就会直接成为集群中的 Leader 节点。

如果集群中存在了多个节点,就会向集群中的其他节点发出 MsgVote 消息,请求其他节点投票,在 Step 函数中包含不同状态的节点接收到消息时的响应:

func (r *raft) Step(m pb.Message) error {  // ...
switch m.Type { case pb.MsgVote, pb.MsgPreVote: canVote := r.Vote == m.From || (r.Vote == None && r.lead == None) if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp}) r.electionElapsed = 0 r.Vote = m.From } else { r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true}) }
} // ... return nil}
复制代码


如果当前节点投的票就是消息的来源或者当前节点没有投票也没有 Leader,那么就会向来源的节点投票,否则就会通知该节点当前节点拒绝投票。

在 stepCandidate 方法中,候选人节点会处理来自其他节点的投票响应消息,也就是 MsgVoteResp:

func stepCandidate(r *raft, m pb.Message) error {	switch m.Type {	// ...	case pb.MsgVoteResp:		gr := r.poll(m.From, m.Type, !m.Reject)		switch r.quorum() {		case gr:			r.becomeLeader()			r.bcastAppend()		// ...		}	}	return nil}
复制代码

每当收到一个 MsgVoteResp 类型的消息时,就会设置当前节点持有的 votes 数组,更新其中存储的节点投票状态并返回投『同意』票的人数,如果获得的票数大于法定人数 quorum,当前节点就会成为集群的 Leader 并向其他的节点发送当前节点当选的消息,通知其余节点更新 Raft 结构体中的 Term 等信息。

ETCD 存储

Etcd 的存储分为内存存储和持久化(硬盘)存储两部分,底层存储采用 BoltDB 数据库,其中 KV 有序排列会将相同前缀的 key 排列在一起,可以顺序遍历,即获取子目录内容,正因为 key 有序,所以其天然支持按目录结构高效遍历。

假若有如下几个 key,其结构分别为:/a/1,/a/2,/a/3,这样只需要 seek 到不大于/a/的第一个 key,开始向后 SCAN 即可,通俗点就是利用有序的 key 模拟一个目录的效果。

另外 etcd 支持复杂事务,在 etcd 中每一个请求都有一个 revision 版本号去标识,所以它是单调递增的,即每一个请求都是唯一的。同样的一个 key 维护多个历史版本,即多版本控制,用于实现 watch 机制。历史版本过多时,可以通过 compact 命令删除。通过 etcd 修改某一个 key 的 value,其他监听的应用既可感知其修改,可以用来做一些消息的分发。

  内存中的存储除了顺序化的记录下所有用户对节点数据变更的记录外,还会对用户数据进行索引、建堆等方便查询的操作。

  持久化则使用预写式日志(WAL:Write Ahead Log)进行记录存储。在 WAL 的体系中,所有的数据在提交之前都会进行日志记录。

  在 etcd 的持久化存储目录中,有两个子目录。一个是 WAL,存储着所有事务的变化记录;另一个则是 snapshot,用于存储某一个时刻 etcd 所有目录的数据。通过 WAL 和 snapshot 相结合的方式,etcd 可以有效的进行数据存储和节点故障恢复等操作。

既然有了 WAL 实时存储了所有的变更,为什么还需要 snapshot 呢?

  随着使用量的增加,WAL 存储的数据会暴增,为了防止磁盘很快就爆满,etcd 默认每 10000 条记录做一次 snapshot,经过 snapshot 以后的 WAL 文件就可以删除。而通过 API 可以查询的历史 etcd 操作默认为 1000 条。首次启动时,etcd 会把启动的配置信息存储到 data-dir 参数指定的数据目录中。配置信息包括本地节点的 ID、集群 ID 和初始时集群信息。

用户需要避免 etcd 从一个过期的数据目录中重新启动,因为使用过期的数据目录启动的节点会与集群中的其他节点产生不一致(如:之前已经记录并同意 Leader 节点存储某个信息,重启后又向 Leader 节点申请这个信息)。所以,为了最大化集群的安全性,一旦有任何数据损坏或丢失的可能性,你就应该把这个节点从集群中移除,然后加入一个不带数据目录的新节点。

预写式日志(WAL)

WAL(Write Ahead Log)最大的作用是记录了整个数据变化的全部历程。在 etcd 中,所有数据的修改在提交前,都要先写入到 WAL 中。

  使用 WAL 进行数据的存储使得 etcd 拥有两个重要功能:

  • 故障快速恢复: 当你的数据遭到破坏时,就可以通过执行所有 WAL 中记录的修改操作,快速从最原始的数据恢复到数据损坏前的状态。

  • 数据回滚(undo)/重做(redo):因为所有的修改操作都被记录在 WAL 中,需要回滚或重做,只需要方向或正向执行日志中的操作即可。

WAL 与 snapshot 在 etcd 中的命名规则

在 etcd 的数据目录中,WAL 文件以 $seq-$index.wal 的格式存储。最初始的 WAL 文件是 0000000000000000-0000000000000000.wal,表示是所有 WAL 文件中的第 0 个,初始的 Raft 状态编号为 0。运行一段时间后可能需要进行日志切分,把新的条目放到一个新的 WAL 文件中。

  假设,当集群运行到 Raft 状态为 20 时,需要进行 WAL 文件的切分时,下一份 WAL 文件就会变为 0000000000000001-0000000000000021.wal。如果在 10 次操作后又进行了一次日志切分,那么后一次的 WAL 文件名会变为 0000000000000002-0000000000000031.wal。可以看到-符号前面的数字是每次切分后自增 1,而-符号后面的数字则是根据实际存储的 Raft 起始状态来定。

Snapshot 的存储命名则比较容易理解,以 $term-$index.wal 格式进行命名存储。term 和 index 就表示存储 snapshot 时数据所在的 raft 节点状态,当前的任期编号以及数据项位置信息。

BoltDB 数据存储

etcd 目前支持 V2 和 V3 两个大版本,这两个版本在实现上有比较大的不同,一方面是对外提供接口的方式,另一方面就是底层的存储引擎,V2 版本的实例是一个纯内存的实现,所有的数据都没有存储在磁盘上,而 V3 版本的实例就支持了数据的持久化。

etcd 底层默认使用的是开源的嵌入式键值存储数据库 bolt,但是这个项目目前的状态已经是归档不再维护了,如果想要使用这个项目可以使用 CoreOS 的 bbolt 版本。

后端

Boltdb 里面定义了内部的 backend 结构体,这是一个实现了 Backend 接口的结构:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104type backend struct {	size int64	sizeInUse int64	commits int64
mu sync.RWMutex db *bolt.DB
batchInterval time.Duration batchLimit int batchTx *batchTxBuffered
readTx *readTx
stopc chan struct{} donec chan struct{}
lg *zap.Logger}
复制代码


从结构体的成员 db 我们就可以看出,它使用了 BoltDB 作为底层存储,另外的两个 readTx 和 batchTx 分别实现了 ReadTx 和 BatchTx 接口:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36type ReadTx interface {	Lock()	Unlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38type BatchTx interface { ReadTx UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeDelete(bucketName []byte, key []byte) Commit() CommitAndStop()}
复制代码

从这两个接口的定义,我们不难发现它们能够对外提供数据库的读写操作,而 Backend 就能对这两者提供的方法进行封装,为上层屏蔽存储的具体实现:

每当我们使用 newBackend 创建一个新的 backend 结构时,都会创建一个 readTx 和 batchTx 结构体,这两者一个负责处理只读请求,一个负责处理读写请求:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176func newBackend(bcfg BackendConfig) *backend {	bopts := &bolt.Options{}	bopts.InitialMmapSize = bcfg.mmapSize()	db, _ := bolt.Open(bcfg.Path, 0600, bopts)
b := &backend{ db: db, batchInterval: bcfg.BatchInterval, batchLimit: bcfg.BatchLimit, readTx: &readTx{ buf: txReadBuffer{ txBuffer: txBuffer{make(map[string]*bucketBuffer)}, }, buckets: make(map[string]*bolt.Bucket), }, stopc: make(chan struct{}), donec: make(chan struct{}), } b.batchTx = newBatchTxBuffered(b) go b.run() return b}
复制代码

当我们在 newBackend 中进行了初始化 BoltDB、事务等工作后,就会开一个 goroutine 异步的对所有批量读写事务进行定时提交:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305func (b *backend) run() {	defer close(b.donec)	t := time.NewTimer(b.batchInterval)	defer t.Stop()	for {		select {		case <-t.C:		case <-b.stopc:			b.batchTx.CommitAndStop()			return		}		if b.batchTx.safePending() != 0 {			b.batchTx.Commit()		}		t.Reset(b.batchInterval)	}}
复制代码

对于上层来说,backend 其实只是对底层存储的一个抽象,很多时候并不会直接跟它打交道,往往都是使用它持有的 ReadTx 和 BatchTx 与数据库进行交互。

索引

BoltDb 存储的是一个 BTree 索引结构,本身对于每一个键值对都有一个 revision 的概念,键值对的每一次变化都会被 BoltDB 单独记录下来,所以想要在存储引擎中获取某一个 Key 对应的值,要先获取 revision,再通过它才能找到对应的值,在里我们想要介绍的其实是 etcd 如何管理和存储一个 Key 的多个 revision 记录。

在 etcd 服务中有一个用于存储所有的键值对 revision 信息的 btree,我们可以通过 index 的 Get 接口获取一个 Key 对应 Revision 的值:

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {	keyi := &keyIndex{key: key}	if keyi = ti.keyIndex(keyi); keyi == nil {		return revision{}, revision{}, 0, ErrRevisionNotFound	}	return keyi.get(ti.lg, atRev)}
复制代码

上述方法通过 keyIndex 方法查找 Key 对应的 keyIndex 结构体,这里使用的内存结构体 btree 是 Google 实现的一个版本:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {	if item := ti.tree.Get(keyi); item != nil {		return item.(*keyIndex)	}	return nil}
复制代码

可以看到这里的实现非常简单,只是从 treeIndex 持有的成员 btree 中查找 keyIndex,将结果强制转换成 keyIndex 类型后返回;获取 Key 对应 revision 的方式也非常简单:

func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {	g := ki.findGeneration(atRev)	if g.isEmpty() {		return revision{}, revision{}, 0, ErrRevisionNotFound	}
n := g.walk(func(rev revision) bool { return rev.main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil }
return revision{}, revision{}, 0, ErrRevisionNotFound}
复制代码

KeyIndex

在我们具体介绍方法实现的细节之前,首先我们需要理解 keyIndex 包含的字段以及管理同一个 Key 不同版本的方式:

每一个 keyIndex 结构体中都包含当前键的值以及最后一次修改对应的 revision 信息,其中还保存了一个 Key 的多个 generation,每一个 generation 都会记录当前 Key『从生到死』的全部过程,每当一个 Key 被删除时都会调用 timestone 方法向当前的 generation 中追加一个新的墓碑版本:

func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {	if ki.generations[len(ki.generations)-1].isEmpty() {		return ErrRevisionNotFound	}	ki.put(lg, main, sub)	ki.generations = append(ki.generations, generation{})	return nil}
复制代码

这个 tombstone 版本标识这当前的 Key 已经被删除了,但是在每次删除一个 Key 之后,就会在当前的 keyIndex 中创建一个新的 generation 结构用于存储新的版本信息,其中 ver 记录当前 generation 包含的修改次数,created 记录创建 generation 时的 revision 版本,最后的 revs 用于存储所有的版本信息。

存储

etcd 的 mvcc 模块对外直接提供了两种不同的访问方式,一种是键值存储 kvstore,另一种是 watchableStore 它们都实现了包内公开的 KV 接口

type KV interface {	ReadView	WriteView
Read() TxnRead Write() TxnWrite
Hash() (hash uint32, revision int64, err error) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
Compact(rev int64) (<-chan struct{}, error) Commit() Restore(b backend.Backend) error Close() error}
复制代码

对于 kvstore 来说,其实没有太多值得展开介绍的地方,它利用底层的 BoltDB 等基础设施为上层提供最常见的增伤改查,它组合了下层的 readTx、batchTx 等结构体,将一些线程不安全的操作变成线程安全的。

func (s *store) Read() TxnRead {	s.mu.RLock()	tx := s.b.ReadTx()	s.revMu.RLock()	tx.Lock()	firstRev, rev := s.compactMainRev, s.currentRev	s.revMu.RUnlock()	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})}
复制代码

它也负责对内存中 btree 索引的维护以及压缩一些无用或者不常用的数据,几个对外的接口 Read、Write 就是对 readTx、batchTx 等结构体的组合并将它们的接口暴露给其他的模块

另外一个比较有意思的存储就是 watchableStore 了,它是 mvcc 模块为外界提供 Watch 功能的接口,它负责了注册、管理以及触发 Watcher 的功能

type watchableStore struct {	*store
mu sync.RWMutex
unsynced watcherGroup synced watcherGroup
stopc chan struct{} wg sync.WaitGroup}
复制代码

每一个 watchableStore 其实都组合了来自 store 结构体的字段和方法,除此之外,还有两个 watcherGroup 类型的字段,其中 unsynced 用于存储未同步完成的实例,synced 用于存储已经同步完成的实例。

在初始化一个新的 watchableStore 时,我们会创建一个用于同步 watcherGroup 的 Goroutine,在 syncWatchersLoop 这个循环中会每隔 100ms 调用一次 syncWatchers 方法,将所有未通知的事件通知给所有的监听者,这可以说是整个模块的核心:

func (s *watchableStore) syncWatchers() int {	curRev := s.store.currentRev	compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes)
tx := s.store.b.ReadTx() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) evs := kvsToEvents(nil, wg, revs, vs)
wb := newWatcherBatch(wg, evs) for w := range wg.watchers { w.minRev = curRev + 1
eb, ok := wb[w] if !ok { s.synced.add(w) s.unsynced.delete(w) continue }
w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})
s.synced.add(w) s.unsynced.delete(w) }
return s.unsynced.size()}
复制代码

简化后的 syncWatchers 方法中总共做了三件事情,首先是根据当前的版本从未同步的 watcherGroup 中选出一些待处理的任务,然后从 BoltDB 中后去当前版本范围内的数据变更并将它们转换成事件,事件和 watcherGroup 在打包之后会通过 send 方法发送到每一个 watcher 对应的 Channel 中。

ETCD 伸缩节点

Etcd 集群启动完毕后,可以在运行的过程中对集群进行重构,包括核心节点的增加、删除、迁移、替换等。运行时重构使得 etcd 集群无须重启即可改变集群的配置,这也是新版 etcd 区别于旧版包含的新特性。

只有当集群中多数节点正常的情况下,你才可以进行运行时的配置管理。因为配置更改的信息也会被 etcd 当成一个信息存储和同步,如果集群多数节点损坏,集群就失去了写入数据的能力。所以在配置 etcd 集群数量时,强烈推荐至少配置 3 个核心节点。

节点迁移、替换

当你节点所在的机器出现硬件故障,或者节点出现如数据目录损坏等问题,导致节点永久性的不可恢复时,就需要对节点进行迁移或者替换。

当一个节点失效以后,必须尽快修复,因为 etcd 集群正常运行的必要条件是集群中多数节点都正常工作。迁移一个节点需要进行四步操作:

  1. 暂停正在运行着的节点程序进程

  2. 把数据目录从现有机器拷贝到新机器

  3. 使用 api 更新 etcd 中对应节点指向机器的 url 记录更新为新机器的 ip

  4. 使用同样的配置项和数据目录,在新的机器上启动 etcd。

节点增加

增加节点可以让 etcd 的高可用性更强。举例来说,如果你有 3 个节点,那么最多允许 1 个节点失效;当你有 5 个节点时,就可以允许有 2 个节点失效。同时,增加节点还可以让 etcd 集群具有更好的读性能。因为 etcd 的节点都是实时同步的,每个节点上都存储了所有的信息,所以增加节点可以从整体上提升读的吞吐量。增加一个节点需要进行两步操作:

  1. 在集群中添加这个节点的 url 记录,同时获得集群的信息。

  2. 使用获得的集群信息启动新 etcd 节点。

节点移除

有时你不得不在提高 etcd 的写性能和增加集群高可用性上进行权衡。Leader 节点在提交一个写记录时,会把这个消息同步到每个节点上,当得到多数节点的同意反馈后,才会真正写入数据。所以节点越多,写入性能越差。

  在节点过多时,你可能需要移除一个或多个。移除节点非常简单,只需要一步操作,就是把集群中这个节点的记录删除。然后对应机器上的该节点就会自动停止。

强制性重启集群

当集群超过半数的节点都失效时,就需要通过手动的方式,强制性让某个节点以自己为 Leader,利用原有数据启动一个新集群。

此时你需要进行两步操作。

  1. 备份原有数据到新机器;

  2. 使用-force-new-cluster 加备份的数据重新启动节点。


参考文献

高可用分布式存储 etcd 的实现原理 https://draveness.me/etcd-introduction/


发布于: 刚刚阅读数: 5
用户头像

Quincy

关注

路过岁月,慢慢生活 2018-08-01 加入

一个心怀远方的搬砖懒汉

评论

发布
暂无评论
快速读懂Etcd_golang_Quincy_InfoQ写作社区