etcd 实现分布式锁
- 2022-10-19 上海
本文字数:4957 字
阅读完需:约 1 分钟
转载自:etcd实现分布式锁
当并发的访问共享资源的时候,如果没有加锁的话,无法保证共享资源安全性和正确性。这个时候就需要用到锁
1、需要具备的特性
需要保证互斥访问(分布式环境需要保证不同节点、不同线程的互斥访问)
需要有超时机制,防止锁意外未释放,其他节点无法获取到锁;也要保证任务能够正常执行完成,不能超时了任务还没结束,导致任务执行一般被释放锁
需要有阻塞和非阻塞两种请求锁的接口
2、本地锁
当业务执行在同一个线程内,也就是我初始化一个本地锁,其他请求也认这把锁。一般是服务部署在单机环境下。
我们可以看下下面的例子,开 1000 个 goroutine 并发的给 Counter 做自增操作,结果会是什么样的呢?
package main
import ( "fmt" "sync")
var sg sync.WaitGroup
type Counter struct { count int}
// 自增操作func (m *Counter) Incr() { m.count++}
// 获取总数func (m *Counter) Count() int { return m.count}
func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) // 模拟并发请求 go func() { c.Incr() sg.Done() }() } sg.Wait()
fmt.Println(c.Count())}
结果是 count 的数量并不是预想中的 1000,而是下面这样,每次打印出的结果都不一样,但是接近 1000
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go953 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go982 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go984
出现这个问题的原因就是没有给自增操作加锁
下面我们修改代码如下,在 Incr 中加上 go 的 mutex 互斥锁
package main
import ( "fmt" "sync")
var sg sync.WaitGroup
type Counter struct { count int mu sync.Mutex}
func (m *Counter) Incr() { // 每次写之前先加锁,写完之后释放锁 m.mu.Lock() defer m.mu.Unlock() m.count++}
func (m *Counter) Count() int { return m.count}
func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) go func() { c.Incr() sg.Done() }() } sg.Wait()
fmt.Println(c.Count())}
可以看到现在 count 正常输出 1000 了
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000
3、etcd 分布式锁
简单部署一个 etcd 集群
├── docker-compose.yml├── etcd│ └── Dockerfile
Dockerfile 文件内容
FROM bitnami/etcd:latest
LABEL maintainer="liuyuede123 <liufutianoppo@163.com>"
Docker-compose.yml 内容
version: '3.5'# 网络配置networks: backend: driver: bridge
# 服务容器配置services: etcd1: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd1 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "12379:2379" - "12380:2380" networks: - backend restart: always
etcd2: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd2 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "22379:2379" - "22380:2380" networks: - backend restart: always
etcd3: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd3 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "32379:2379" - "32380:2380" networks: - backend restart: always
执行docker-compose up -d启动 etcd 服务,可以看到 docker 中已经启动了 3 个服务
实现互斥访问
package main
import ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} // 初始化etcd客户端 client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
sg.Add(100) for i := 0; i < 100; i++ { go func() { // 这里会生成租约,默认是60秒 session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock") locker.Lock() counter.Incr() locker.Unlock() sg.Done() }() } sg.Wait()
fmt.Println("count:", counter.Count())}
执行结果:
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100
实现超时机制
当某个客户端持有锁时,由于某些原因导致锁未释放,就会导致这个客户端一直持有这把锁,其他客户端一直获取不到锁。所以需要分布式锁实现超时机制,当锁未释放时,会因为 etcd 的租约会到期而释放锁。当业务正常处理时,租约到期之前会继续续约,知道业务处理完毕释放锁。
package main
import ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) locker.Lock() fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) // 模拟业务 time.Sleep(100 * time.Second) counter.Incr() locker.Unlock() fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())}
命令行开 2 个窗口,第一个窗口执行程序并获取锁,之后模拟意外退出并没有调用 unlock 方法
go run main.golocking... 2022-09-03 23:41:48 # 租约生成时间locked... 2022-09-03 23:41:48^Csignal: interrupt
第二个窗口,在第一个窗口退出之前尝试获取锁,此时是阻塞状态。第一个窗口退出之后由于租约还没到期,第二个窗口还是获取锁的状态。等到第一个窗口租约到期(默认 60 秒),第二个获取锁成功
locking... 2022-09-03 23:41:52locked... 2022-09-03 23:42:48 # 第一个租约60秒到期,获取锁成功released... 2022-09-03 23:44:28count: 1
实现阻塞和非阻塞接口
上面的例子中已经实现了阻塞接口,即当前有获取到锁的请求,则其他请求阻塞等待锁释放
非阻塞的方式就是尝试获取锁,如果失败立即返回。etcd 中是实现了 tryLock 方法
// TryLock locks the mutex if not already locked by another session.// If lock is held by another session, return immediately after attempting necessary cleanup// The ctx argument is used for the sending/receiving Txn RPC.func (m *Mutex) TryLock(ctx context.Context) error {
具体看下面的例子
package main
import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
// 此处使用newMutex初始化 locker := concurrency.NewMutex(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) err = locker.TryLock(context.Background()) // 获取锁失败就抛错 if err != nil { fmt.Println("lock failed", err) return } fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) time.Sleep(100 * time.Second) counter.Incr() err = locker.Unlock(context.Background()) if err != nil { fmt.Println("unlock failed", err) return } fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())}
窗口 1、窗口 2 执行结果
go run main.golocking... 2022-09-04 00:00:21locked... 2022-09-04 00:00:21released... 2022-09-04 00:02:01count: 1
go run main.golocking... 2022-09-04 00:00:27lock failed mutex: Locked by another session
六月的
还未添加个人签名 2019-07-23 加入
还未添加个人简介










评论