etcd 概述
etcd 的特点
etcd 是一个 Go 言编写的分布式、高可用的一致性键值存储系统,用于提供可靠的分布式键值存储、配置共享和服务发现等功能
etcd 具有以下特点:
简单:
易使用:
易部署:使用 Go 语言编写,跨平台,部署和维护简单。获取地址:http://www.jnpfsoft.com/?from=infoq
可靠:
强一致:使用 Raft 算法充分保证了分布式系统数据的强一致性;
高可用:具有容错能力,假设集群有 n 个节点,当有(n-1)/2 节点发送故障,依然能提供服务;
持久化:数据更新后,会通过 WAL 格式数据持久化到磁盘,支持 Snapshot 快照。
快速:每个实例每秒支持一千次写操作,极限写性能可达 10K QPS。
安全:可选 SSL 客户认证机制。
v3 版本使用 HTTP/2 协议,同一个连接可以同时处理多个请求,摒弃多个请求需要建立多个连接的方式。
etcd 的应用场景
服务发现(service discovery)
消息发布与订阅
负载均衡
分布式通知与协调
分布式锁、分布式队列
集群监控与 Leader 竞选
etcd 架构
etcd 主要分为四个部分:
HTTP Server:用于处理用户发送的 API 请求以及其它 etcd 节点的同步与心跳信息请求。
Store:用于处理 etcd 支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是 etcd 对用户提供的大多数 API 功能的具体实现。
Raft:Raft 强一致性算法的具体实现,是 etcd 的核心。
WAL:Write Ahead Log(预写式日志),是 etcd 的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd 就通过 WAL 进行持久化存储。WAL 中,所有的数据提交前都会事先记录日志。Snapshot 是为了防止数据过多而进行的状态快照;Entry 表示存储的具体日志内容。
Raft 协议
Raft 的基本概念
每个 etcd 节点都维护了一个状态机,并且任何时刻最多存在一个有效的主节点,主节点处理所有来自客户端的写操作,通过 Raft 协议保证写操作对状态机的改动会可靠的同步到其他节点。
Raft 协议一共包含如下 3 类角色:
在进行选举过程中,有几个重要的概念:
Leader Election(领导人选举):简称选举,就是从候选人中选出领袖;
Term(任期):它其实是个单独递增的连续数字,每一次任期就会重新发起一次领导人选举;
Election Timeout(选举超时):就是一个超时时间,当群众超时未收到领袖的心跳时,会重新进行选举。
选主
角色转换
群众 -> 候选人:当开始选举,或者“选举超时”时
候选人 -> 候选人:当“选举超时”,或者开始新的“任期”
候选人 -> 领袖:获取大多数投票时
候选人 -> 群众:其它节点成为领袖,或者开始新的“任期”
领袖 -> 群众:发现自己的任期 ID 比其它节点分任期 ID 小时,会自动放弃领袖位置
选举的特点
Raft 协议是用于维护一组服务节点数据一致性的协议。一组服务节点构成一个集群,并且有一个节点来对外提供服务。当集群初始化,或者主节点挂掉以后,面临选主的问题。集群中每个节点,任意时刻都处于 Leader、Follower、Candidate 这三个角色之一。
当集群初始化时,每个节点都是 Follower 角色
集群中存在至多 1 个有效的主节点,通过心跳与其他节点同步数据
当 Follower 在一定时间内没有收到来自主节点的心跳,会将自己的角色改变为 Candidate,并发起一次选主投票
当收到包括自己在内超过半数节点赞成后,选举成功
当收到票数不足半数选举失败,或者选举超时
若本轮未选出主节点,将进行下一轮选举(出现这种情况,是由于多个节点同时选举,所有节点均为获得超过半数选票)
Candidate 节点收到来自主节点的信息后,会立即终止选举过程,进入 Follower 角色。
为了避免陷入选主失败循环,每个节点未收到心跳发起选举的时间是一定范围内的随机值,这样能够避免 2 个节点同时发起选主。
选举流程示例
1)选举人选举
现在有 A、B、C 三个节点
成为候选人:
选举领导人:
心跳探测:
2)领袖挂掉
当领袖 B 挂掉,群众 A 和 C 的“选举定时器”会一直运行,当群众 A 先超时时,会成为候选人。
后续流程和“领导人选举”流程一样,即通知投票 -> 接收投票 -> 成为领袖 -> 心跳探测。
3)出现多个候选者情况
当出现多个候选者 A 和 D 时,两个候选者会同时发起投票:
当 C 成为新的候选者,此时的任期 Term 为 5,发起新一轮的投票,其它节点发起投票后,会更新自己的任期值,最后选择新的领袖为 C 节点。
脑裂情况
当网络问题导致脑裂,出现双 Leader 情况时,每个网络可以理解为一个独立的网络,因为原先的 Leader 独自在一个区,所以向他提交的数据不可能被复制到大多数节点上,所以数据永远都不会提交
当网络恢复之后,旧的 Leader 发现集群中的新 Leader 的 Term 比自己大,则自动降级为 Follower,并从新 Leader 处同步数据达成集群数据一致
脑裂情况其实只是异常情况的一种,当 Leader 通知 Follower 更新日志、Leader 提交更新时,都存在各种异常情况导致的问题
日志复制
日志复制,是指主节点将每次操作形成日志条目,并持久化到本地磁盘,然后通过网络 IO 发送给其他节点。其他节点根据日志的 逻辑时钟(TERM) 和 日志编号(INDEX) 来判断是否将该日志记录持久化到本地。当主节点收到包括自己在内超过半数节点成功返回,那么认为该日志是可提交的(committed),并将日志输入到状态机,将结果返回给客户端。
每次选主都会形成一个唯一的 TERM 编号,相当于逻辑时钟。每一条日志都有全局唯一的编号。
主节点通过网络 IO 向其他节点追加日志:
若某节点收到日志追加的消息,首先判断该日志的 TERM 是否过期,以及该日志条目的 INDEX 是否比当前以及提交的日志的 INDEX 更早
若已过期,或者比提交的日志更早,那么就拒绝追加,并返回该节点当前的已提交的日志的编号。否则,将日志追加,并返回成功
当主节点收到其他节点关于日志追加的回复后,若发现有拒绝,则根据该节点返回的已提交日志编号,发生其编号下一条日志。
主节点像其他节点同步日志,还作了拥塞控制:主节点发现日志复制的目标节点拒绝了某次日志追加消息,将进入日志探测阶段,一条一条发送日志,直到目标节点接受日志,然后进入快速复制阶段,可进行批量日志追加。
按照日志复制的逻辑,集群中慢节点不影响整个集群的性能。且数据只从主节点复制到 Follower 节点,这样大大简化了逻辑流程。
安全性
截止此刻,选主以及日志复制并不能保证节点间数据一致。当一个某个节点挂掉了,一段时间后再次重启,并当选为主节点。而在其挂掉这段时间内,集群若有超过半数节点存活,集群会正常工作,那么会有日志提交。这些提交的日志无法传递给挂掉的节点。当挂掉的节点再次当选主节点,它将缺失部分已提交的日志。在这样场景下,按 Raft 协议,它将自己日志复制给其他节点,会将集群已经提交的日志给覆盖掉。这显然是不可接受的。
其他协议解决这个问题的办法是,新当选的主节点会询问其他节点,和自己数据对比,确定出集群已提交数据,然后将缺失的数据同步过来。这个方案有明显缺陷,增加了集群恢复服务的时间(集群在选举阶段不可服务),并且增加了协议的复杂度。
Raft 解决的办法是,在选主逻辑中,对能够成为主的节点加以限制,确保选出的节点已定包含了集群已经提交的所有日志。如果新选出的主节点已经包含了集群所有提交的日志,那就不需要从和其他节点比对数据了。简化了流程,缩短了集群恢复服务的时间。
这里存在一个问题,加以这样限制之后,还能否选出主呢?答案是:只要仍然有超过半数节点存活,这样的主一定能够选出。因为已经提交的日志必然被集群中超过半数节点持久化,显然前一个主节点提交的最后一条日志也被集群中大部分节点持久化。当主节点挂掉后,集群中仍有大部分节点存活,那这存活的节点中一定存在一个节点包含了已经提交的日志了。
etcd 集群部署
etcd 安装
环境信息
etcd 二进制安装
etcd 下载:https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz
wget -c https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz
tar zxvf etcd-v3.5.4-linux-amd64.tar.gz -C /usr/local/
ln -s /usr/local/etcd-v3.5.4-linux-amd64/ /usr/local/etcd
sed -i '$aPATH="/usr/local/etcd/:$PATH"' /etc/profile
source /etc/profile
mkdir -pv /data/etcd/{log,data} # data为节点数据存储目录,log为日志目录
复制代码
etcd 集群初始化
在每台 etcd 节点上进行集群初始化
# 在每台etcd节点上进行集群初始化
# 注意:在etcd2和etcd3上操作时注意修改--name并且将对应的IP地址改成对应节点的IP
#!/bin/sh
etcd --name etcd0 \
--data-dir /data/etcd/data \
--advertise-client-urls http://10.0.0.80:2379,http://10.0.0.80:4001 \
--listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
--initial-advertise-peer-urls http://10.0.0.80:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd0=http://10.0.0.80:2380,etcd1=http://10.0.0.81:2380,etcd2=http://10.0.0.82:2380 \
--initial-cluster-state new > /data/etcd/log/etcd.log 2>&1
# 以后台进程方式运行在后台
# 将以上命令保存到 ./etcd0.sh
chmod +x etcd0.sh
nohup ./etcd0.sh &
# 注意:初始化只是在集群初始化时运行一次,后续服务重启的话,需要去掉所有的initial参数,否则会报错
# 集群初始化完毕后的启动命令:
etcd --name etcd0 \
--data-dir /data/etcd/data \
--advertise-client-urls http://10.0.0.80:2379,http://10.0.0.80:4001 \
--listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
--listen-peer-urls http://0.0.0.0:2380
复制代码
etcd 参数说明
检查 etcd 集群状态
# 检查集群状态
[root@etcd0 ~]# etcdctl member list --write-out=table --endpoints=http://10.0.0.80:2379
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
| ID | STATUS | NAME | PEER ADDRS | CLIENT ADDRS | IS LEARNER |
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
| 31eb17a19c60d188 | started | etcd0 | http://10.0.0.80:2380 | http://10.0.0.80:2379,http://10.0.0.80:4001 | false |
| d23e97e2884bcaba | started | etcd1 | http://10.0.0.81:2380 | http://10.0.0.81:2379,http://10.0.0.81:4001 | false |
| f0a7d39c34766dd7 | started | etcd2 | http://10.0.0.82:2380 | http://10.0.0.82:2379,http://10.0.0.82:4001 | false |
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
# 检查节点健康状态
[root@etcd0 ~]# etcdctl endpoint health --cluster --endpoints="http://10.0.0.80:2379"
http://10.0.0.80:4001 is healthy: successfully committed proposal: took = 5.78219ms
http://10.0.0.82:2379 is healthy: successfully committed proposal: took = 7.975032ms
http://10.0.0.81:2379 is healthy: successfully committed proposal: took = 6.516737ms
http://10.0.0.80:2379 is healthy: successfully committed proposal: took = 7.419094ms
http://10.0.0.82:4001 is healthy: successfully committed proposal: took = 5.000158ms
http://10.0.0.81:4001 is healthy: successfully committed proposal: took = 8.251767ms
# 检查集群详细状态
[root@etcd0 ~]# etcdctl endpoint status --endpoints="http://10.0.0.80:2379,http://10.0.0.81:2379,http://10.0.0.82:2379" --write-out=table
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| http://10.0.0.80:2379 | 31eb17a19c60d188 | 3.5.4 | 20 kB | true | false | 2 | 23 | 23 | |
| http://10.0.0.81:2379 | d23e97e2884bcaba | 3.5.4 | 20 kB | false | false | 2 | 23 | 23 | |
| http://10.0.0.82:2379 | f0a7d39c34766dd7 | 3.5.4 | 20 kB | false | false | 2 | 23 | 23 | |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
复制代码
etcd 节点迁移
当遇到硬件故障发生的时候,需要快速恢复节点。ETCD 集群可以做到在不丢失数据的,并且不改变节点 ID 的情况下,迁移节点:
停止待迁移节点上的 etc 进程;
将数据目录打包复制到新的节点;
更新该节点对应集群中 peer url,让其指向新的节点;
使用相同的配置,在新的节点上启动 etcd 进程;
etcd 操作
写入数据
# 使用put写入键值对
[root@etcd0 ~]# etcdctl put mytest "hello" --endpoints=http://10.0.0.80:2379
OK
[root@etcd0 ~]# etcdctl put mytest1 "hello 001" --endpoints=http://10.0.0.80:2379
OK
[root@etcd0 ~]# etcdctl put mytest2 "hello 002" --endpoints=http://10.0.0.80:2379
OK
复制代码
获取数据
# 使用get来通过具体的键获取数据
[root@etcd0 ~]# etcdctl get mytest --endpoints=http://10.0.0.80:2379
mytest
hello
# 通过前缀来获取数据
[root@etcd0 ~]# etcdctl get mytest --prefix --endpoints=http://10.0.0.80:2379
mytest
hello
mytest1
hello 001
mytest2
hello 002
复制代码
删除数据
[root@etcd0 ~]# etcdctl del mytest --endpoints=http://10.0.0.80:2379
1 # 删除了一条数据
[root@etcd0 ~]# etcdctl del mytest --prefix --endpoints=http://10.0.0.80:2379
2 # 删除了两条数据
复制代码
监听数据
watch 用于获取监听信息的更改,其可以持续监听
# 先开一个窗口监听
[root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
# 然后再开一个窗口写入
[root@redis01 ~]# etcdctl put mytest 001 --endpoints=http://10.0.0.80:2379
OK
# 先前的窗口会受到更新信息,并且会持续监听
[root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
PUT
mytest
001
复制代码
go 语言操作 etcd
连接 etcd
连接 k8s 中的 etcd
# 操作k8s中的etcd需要指定对应的ca,以及对应的证书和私钥,这里使用了apiserver的证书和私钥来连接etcd
etcdctl --endpoints=https://10.0.0.10:2379 --cacert="/etc/kubernetes/pki/etcd/ca.crt" --cert="/etc/kubernetes/pki/apiserver-etcd-client.crt" --key="/etc/kubernetes/pki/apiserver-etcd-client.key"
复制代码
go 语言设置 TLS 调用 etcd:
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"go.etcd.io/etcd/clientv3"
"io/ioutil"
)
func main() {
// 设置客户端证书和私钥
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
fmt.Println("Error loading client key pair:", err)
return
}
// 加载根证书
caCert, err := ioutil.ReadFile("ca.crt")
if err != nil {
fmt.Println("Error reading CA certificate:", err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// 创建 TLS 配置
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
// 创建 etcd 客户端连接
endpoint := "localhost:2379"
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
fmt.Println("Error creating etcd client:", err)
return
}
// 在连接上设置上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 执行 etcd 操作
resp, err := client.Put(ctx, "example_key", "example_value")
if err != nil {
fmt.Println("Error putting value:", err)
return
}
fmt.Println(resp)
}
复制代码
PUT 和 GET 操作
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"
"go.etcd.io/etcd/clientv3"
)
// use etcd/clientv3
func main() {
// 连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"10.0.0.80:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put操作
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"./my.log","topic":"web_log"},{"path":"./xxx.log","topic":"xxx_conf"}]`
_, err = cli.Put(ctx, "/logagent/192.168.10.2/collect_config", value)
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get操作
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "/logagent/192.168.10.2/collect_config")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
复制代码
watch 操作
watch 操作示例:
// watch传入的key是否有变化,如果有变化,将其获取后创建成配置对象发往taillog的新配置通道
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key) // watch对应的key,第一个参数需要传入一个context对象
for wresp := range ch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// key有变换,进行通知,先判断操作的类型
var newConf []*LogEntry
if ev.Type != clientv3.EventTypeDelete {
// 如果不是删除操作, 则创建一个newConf对象传入taillog的新配置通道
err := json.Unmarshal(ev.Kv.Value, &newConf)
if err != nil {
fmt.Println("json unmarshal failed , err: ", err)
continue
}
}
fmt.Printf("get new conf: %v\n", newConf)
newConfCh <- newConf
}
}
}
复制代码
文章转载自:Praywu
原文链接:https://www.cnblogs.com/hgzero/p/17232679.html
评论