写点什么

etcd 实现分布式锁

作者:六月的
  • 2022-10-19
    上海
  • 本文字数:4957 字

    阅读完需:约 1 分钟

转载自:etcd实现分布式锁


当并发的访问共享资源的时候,如果没有加锁的话,无法保证共享资源安全性和正确性。这个时候就需要用到锁

1、需要具备的特性

  1. 需要保证互斥访问(分布式环境需要保证不同节点、不同线程的互斥访问)

  2. 需要有超时机制,防止锁意外未释放,其他节点无法获取到锁;也要保证任务能够正常执行完成,不能超时了任务还没结束,导致任务执行一般被释放锁

  3. 需要有阻塞和非阻塞两种请求锁的接口

2、本地锁

当业务执行在同一个线程内,也就是我初始化一个本地锁,其他请求也认这把锁。一般是服务部署在单机环境下。


我们可以看下下面的例子,开 1000 个 goroutine 并发的给 Counter 做自增操作,结果会是什么样的呢?


package main
import ( "fmt" "sync")
var sg sync.WaitGroup
type Counter struct { count int}
// 自增操作func (m *Counter) Incr() { m.count++}
// 获取总数func (m *Counter) Count() int { return m.count}
func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) // 模拟并发请求 go func() { c.Incr() sg.Done() }() } sg.Wait()
fmt.Println(c.Count())}
复制代码


结果是 count 的数量并不是预想中的 1000,而是下面这样,每次打印出的结果都不一样,但是接近 1000


user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go953 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go982 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go984
复制代码


出现这个问题的原因就是没有给自增操作加锁


下面我们修改代码如下,在 Incr 中加上 go 的 mutex 互斥锁


package main
import ( "fmt" "sync")
var sg sync.WaitGroup
type Counter struct { count int mu sync.Mutex}
func (m *Counter) Incr() { // 每次写之前先加锁,写完之后释放锁 m.mu.Lock() defer m.mu.Unlock() m.count++}
func (m *Counter) Count() int { return m.count}
func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) go func() { c.Incr() sg.Done() }() } sg.Wait()
fmt.Println(c.Count())}
复制代码


可以看到现在 count 正常输出 1000 了


user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000
复制代码

3、etcd 分布式锁

简单部署一个 etcd 集群

├── docker-compose.yml├── etcd│   └── Dockerfile
复制代码


Dockerfile 文件内容


FROM bitnami/etcd:latest
LABEL maintainer="liuyuede123 <liufutianoppo@163.com>"
复制代码


Docker-compose.yml 内容


version: '3.5'# 网络配置networks:  backend:    driver: bridge
# 服务容器配置services: etcd1: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd1 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "12379:2379" - "12380:2380" networks: - backend restart: always
etcd2: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd2 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "22379:2379" - "22380:2380" networks: - backend restart: always
etcd3: # 自定义容器名称 build: context: etcd # 指定构建使用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd3 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "32379:2379" - "32380:2380" networks: - backend restart: always
复制代码


执行docker-compose up -d启动 etcd 服务,可以看到 docker 中已经启动了 3 个服务


实现互斥访问

package main
import ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} // 初始化etcd客户端 client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
sg.Add(100) for i := 0; i < 100; i++ { go func() { // 这里会生成租约,默认是60秒 session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock") locker.Lock() counter.Incr() locker.Unlock() sg.Done() }() } sg.Wait()
fmt.Println("count:", counter.Count())}
复制代码


执行结果:


user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100
复制代码

实现超时机制

当某个客户端持有锁时,由于某些原因导致锁未释放,就会导致这个客户端一直持有这把锁,其他客户端一直获取不到锁。所以需要分布式锁实现超时机制,当锁未释放时,会因为 etcd 的租约会到期而释放锁。当业务正常处理时,租约到期之前会继续续约,知道业务处理完毕释放锁。


package main
import ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) locker.Lock() fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) // 模拟业务 time.Sleep(100 * time.Second) counter.Incr() locker.Unlock() fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())}
复制代码


命令行开 2 个窗口,第一个窗口执行程序并获取锁,之后模拟意外退出并没有调用 unlock 方法


go run main.golocking... 2022-09-03 23:41:48 # 租约生成时间locked... 2022-09-03 23:41:48^Csignal: interrupt
复制代码


第二个窗口,在第一个窗口退出之前尝试获取锁,此时是阻塞状态。第一个窗口退出之后由于租约还没到期,第二个窗口还是获取锁的状态。等到第一个窗口租约到期(默认 60 秒),第二个获取锁成功


locking... 2022-09-03 23:41:52locked... 2022-09-03 23:42:48 # 第一个租约60秒到期,获取锁成功released... 2022-09-03 23:44:28count: 1
复制代码

实现阻塞和非阻塞接口

上面的例子中已经实现了阻塞接口,即当前有获取到锁的请求,则其他请求阻塞等待锁释放


非阻塞的方式就是尝试获取锁,如果失败立即返回。etcd 中是实现了 tryLock 方法


// TryLock locks the mutex if not already locked by another session.// If lock is held by another session, return immediately after attempting necessary cleanup// The ctx argument is used for the sending/receiving Txn RPC.func (m *Mutex) TryLock(ctx context.Context) error {
复制代码


具体看下面的例子


package main
import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")
var sg sync.WaitGroup
type Counter struct { count int}
func (m *Counter) Incr() { m.count++}
func (m *Counter) Count() int { return m.count}
func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close()
// 此处使用newMutex初始化 locker := concurrency.NewMutex(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) err = locker.TryLock(context.Background()) // 获取锁失败就抛错 if err != nil { fmt.Println("lock failed", err) return } fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) time.Sleep(100 * time.Second) counter.Incr() err = locker.Unlock(context.Background()) if err != nil { fmt.Println("unlock failed", err) return } fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())}
复制代码


窗口 1、窗口 2 执行结果


go run main.golocking... 2022-09-04 00:00:21locked... 2022-09-04 00:00:21released... 2022-09-04 00:02:01count: 1
复制代码


go run main.golocking... 2022-09-04 00:00:27lock failed mutex: Locked by another session
复制代码


用户头像

六月的

关注

还未添加个人签名 2019-07-23 加入

还未添加个人简介

评论

发布
暂无评论
etcd实现分布式锁_分布式锁_六月的_InfoQ写作社区