大家好,我是「云舒编程」,今天我们来聊聊 Golang 锁结构:Mutex。
文章首发于微信公众号:云舒编程
关注公众号获取:1、大厂项目分享 2、各种技术原理分享 3、部门内推
一、前言
   Golang 的 Mutex 算是在日常开发中最常见的组件了,并且关于锁的知识也是面试官最喜欢问的。
   曾经在一次腾讯面试中,被面试官问得体无完肤。
   虽然 Golang Mutex 只有短短的 200 多行,但是已经是一个极其丰富、精炼的组件,有极其复杂的状态控制。我已经看过很多次 Mutex 的源码,但是总是过段时间就会又处于懵逼状态,不得其道。分析下来,猜测是缺少“历史背景”,一上来就看到的是已经经过好几轮优化的代码,但是不清楚这么优化的背景,同时也缺少一些场景,就会导致无法理解一些设计。
   其实如果我们去追溯 Mutex 的演进历史,会发现,Mutex 最开始是一个非常简单的实现,简单到难以置信的地步,是 Go 开发者们经过了好几轮的优化才变成了现在这么一个非常复杂的数据结构,这是一个逐步完善的过程。
   于是我想如果我们是设计者,我们会怎么去设计去优化一个锁的实现呢?
   下面我将结合我曾经的腾讯面试经历 加上 代入“设计者”的角度出发,结合 Mutex 的演进历史,去分析如何设计一个功能完备的锁。希望经过本文的分析,你也可以从零设计出属于你的「Mutex」。
   友情提醒:文章很长,但是绝对值得一读。
二、面试中遇到 Mutex
   这里大致还原下之前的面试流程,由于当初并没有答上来,所以会加入一些下来学习后的思考,从而让剧情可以顺利开展。
   面试官:对 Mutex 熟悉吗?
   我:熟悉,Mutex 是 Golang 中的锁,主要是控制并发访问资源,保护共享资源。
   面试官:那如果让你去实现 Mutex,你会怎么设计?
   我:简单,定义一个 int 变量,0 代表没有加锁,1 代表加锁了,初始状态为 0。然后使用一个 For 循环去尝试加锁,加锁时使用 Atomic 的 CAS 尝试将值从 0 改为 1,如果成功了就获取锁,如果没有成功就继续 For 循环也就是自旋尝试获取锁,直到成功为止。拿到锁的线程想解锁的话也是通过 Atomic 的 CAS 尝试将值从 1 改为 0。
 type Mutex struct {    key  int32}
func (m *Mutex) Lock() {    for {        if atomic.CompareAndSwapInt32(&m.key, 0, 1) {            return        }    }}
func (m *Mutex) Unlock() {    atomic.CompareAndSwapInt32(&m.key, 1, 0)}
   复制代码
 
   面试官:那你觉得你的这个设计有没有问题?
   我:性能会比较差,由于加锁是通过自旋实现的,如果有很多 Goroutine 在竞争锁的话,会导致 CPU 资源被打满,同时也是浪费资源。
   面试官:那怎么优化呢?
   我:可以利用信号量去实现 Goroutine 的睡眠和唤醒,避免自旋浪费 CPU。
 type Mutex struct {    key  int32    sema uint32}
func (m *Mutex) Lock() {    if atomic.AddInt32(&m.key, 1) ==  1 {  //值从0变为1,则证明加锁成功        return    }    runtime.Semacquire(&m.sema) //等待信号量}
func (m *Mutex) Unlock() {    if atomic.AddInt32(&m.key, -1) == 0 { //值-1后,变为0证明没有协程等待锁了,可以直接返回        return    }    runtime.Semrelease(&m.sema) //唤起信号量上的协程}
   复制代码
 
   面试官:回答不错,那你觉得这版的实现还有其他问题吗?
   我:首先这版的实现解决了自旋的性能问题,其次由于信号量有一个先进先出的等待队列,所以也可以保证竞争锁的 Goroutine 是先到先得的,保证了公平。但是这样的公平其实在调度层面又是效率不高的。
   面试官:这里的「效率不高」从何说起?
   我:假设有三个协程分别是 G1,G2,G3。G1 首先加锁成功了,然后执行业务逻辑。期间 G2 想加锁发现加不上,就进入了信号量的等待队列,这个时候 G2 可能已经被调度器从 M 上调走了。然后 G1 解锁,这个时候 G3 想加锁发现由于 G2 在他前面进行了等待,所以导致 G3 加不上。这种情况由于 G2 没有获得 CPU 时间片,但是 G3 已经获得了 CPU 时间片,所以直接把锁给 G3 从整体上来说,效率会更加高些。
   面试官:非常好,那应该怎么实现呢?
   我:这里的本质是想给处于运行态的 Goroutine 直接获得锁的机会,将上面的代码修改为:
这样新来的 Goroutine 可以有一次直接获取锁的机会
 type Mutex struct {    key  int32    sema uint32}
func (m *Mutex) Lock() {    if atomic.CompareAndSwapInt32(&m.key, 0, 1) {        return    }    for {        if atomic.CompareAndSwapInt32(&m.key, 0, 1) {            return        }        runtime.Semacquire(&m.sema)    }}
func (m *Mutex) Unlock() {    if atomic.CompareAndSwapInt32(&m.key, 1, 0) {        runtime.Semrelease(&m.sema)    }}
   复制代码
 
   面试官:这样写,有一个明显的问题,你知道「惊群效应」吗?
   我:哦对,假设先有三个协程 G1,G2,G3。G1 加锁成功,G2,G3 进入了信号量的等待队列。然后 G1 解锁,G2 被唤醒,这个时候来了个新协程 G4,G2 和 G4 一起竞争锁,G4 竞争成功。然后 G4 很快执行完进行解锁,然后 G3 就被唤醒了,这个时候就存在 G2,G3 一起竞争锁的场景。如果在高并发场景,这样的唤醒竞争还会更加激烈。同时也违背了 G2,G3 在信号量上先来先得的设计。
   面试官:对,那应该怎么解决呢?
   我:可以考虑增加一个标识,如果发现已经有协程被唤醒了,后来的协程就只是解锁,但是不唤醒协程。
   面试官:还有一个问题,如果是最后一个协程,那他解锁的时候还需要进行 runtime.Semrelease(&m.sema)吗?
   我:哦对,这里也需要考虑。总结下我们的锁还需要以下额外能力:
         1、信号量:阻塞唤醒协程
         2、知道当前有多少协程阻塞在信号量上
         3、避免「惊群效应」
   面试官:嗯,差不多了。说说你的设计思路吧
   我:按照一般的设计我们需要使用三个变量控制,类似下面这样
 type Mutex struct {    key  int32  //控制是否加锁成功,1:加锁 0:未加锁    woken bool  //是否已经有协程被唤醒了    waiter int32 //记录当前有多少协程阻塞在信号量上    sema uint32 //信号量}
   复制代码
 
   面试官:可以这样设计,不过可以优化下吗?很多框架组件设计都会充分利用变量的高位低位,你这里可以这样设计吗?
   我:哦对,我再优化下,定义 int32 的变量 state 将其分为三部分:
 type Mutex struct {    state int32    sema  uint32}
   复制代码
 
那么加锁逻辑就变为:
 const (    mutexLocked      = 1 // mutex is locked    mutexWoken       = 2     mutexWaiterShift = 2)
type Mutex struct {    state int32    sema  uint32}
func (m *Mutex) Lock() {    //给新来的协程直接加锁的机会    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {        return    }
    //上面没有加锁成功,尝试在接下来的唤醒中去竞争锁    awoke := false //表示当前协程是不是被唤醒的    for {        old := m.state        new := old | mutexLocked // 设置锁标志位为1        if old&mutexLocked != 0 {            new = old + 1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter+1        }        if awoke { //尝试清除唤醒标志            new &^= mutexWoken        }
        //这里尝试将state从old设置为new。如果代码能够执行到这步,代表了可能发生以下几种情况的一种或者多种        //1、当前协程尝试加锁        //2、waiter+1        //3、清除唤醒标志        if atomic.CompareAndSwapInt32(&m.state, old, new) {            if old & mutexLocked == 0{                //成功获取到锁了,返回                break            }                        //没有获取到锁,阻塞在信号量上            runtime.Semacquire(&m.sema)            awoke = true //执行到这步,证明是被信号量唤醒的,设置唤醒标志        }    }}
   复制代码
 
对上面的一些运算进行解释:
1、new := old | mutexLocked 将 state 的最低位设置为 1,即代表想加锁;
2、new = old + 1<<mutexWaiterShift,首先会执行 1<<mutexWaiterShift,即将 1 左移两位,移位后的值在加上 old。1 左移两位即避开了 mutexLocked 和 mutexWoken,然后再跟 old 相加,就可以实现 waiter+1。
3、new &^= mutexWoken,&^ 是一个位清除运算符(bit clear operator,它的作用是将第一个操作数(左操作数)中的位与第二个操作数(右操作数)相对应的位进行比较。如果右操作数的位为 1,则将左操作数的相应位清零(设置为 0)。如果右操作数的位为 0,则左操作数的相应位保持不变。
   面试官:写的不错,解锁的部分呢?
   我:别急,马上补上
 func (m *Mutex) Unlock() {    new := atomic.AddInt32(&m.state, -mutexLocked)    if (new+mutexLocked)&mutexLocked == 0 {        panic("sync: unlock of unlocked mutex")    }        old := new    for{        //以下情况都直接结束,不继续往下:        //1、如果没有人阻塞在信号量上了        //2、其他人加锁了        //3、已经有人唤醒协程了        if old>>mutexWaiterShift == 0 || old & (mutexLocked|mutexWoken) != 0{            return        }                new = (old - 1<<mutexWaiterShift) | mutexWoken //waiter-1 并且将唤醒标志置为1        if atomic.CompareAndSwapInt32(&m.state,old,new){            //如果cas执行成功就唤醒一个协程            runtime.Semacquire(&m.sema)            return        }        old = m.state    }}
   复制代码
 
   面试官:不错,你的这个设计就是 2011 年 Russ Cox 的第二版的 Mutex 实现逻辑。
三、golang 原版 Mutex
上面的分析过程就是 golang Mutex 的演进历史,给大家看下 golang 原版 Mutex 代码。
V1 版本极其简单:github地址
 // Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.
package sync
package func cas(val *int32, old, new int32) bool
export type Mutex struct {    key int32;    sema int32;}
func xadd(val *int32, delta int32) (new int32) {    for {        v := *val;        if cas(val, v, v+delta) {            return v+delta;        }    }    panic("unreached")}
func (m *Mutex) Lock() {    if xadd(&m.key, 1) == 1 {        // changed from 0 to 1; we hold lock        return;    }    sys.semacquire(&m.sema);}
func (m *Mutex) Unlock() {    if xadd(&m.key, -1) == 0 {        // changed from 1 to 0; no contention        return;    }    sys.semrelease(&m.sema);}
   复制代码
 
V2 版本增加了亿点细节:github地址
 // Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.
// Package sync provides basic synchronization primitives such as mutual// exclusion locks.  Other than the Once and WaitGroup types, most are intended// for use by low-level library routines.  Higher-level synchronization is// better done via channels and communication.//// Values containing the types defined in this package should not be copied.package sync
import "sync/atomic"
// A Mutex is a mutual exclusion lock.// Mutexes can be created as part of other structures;// the zero value for a Mutex is an unlocked mutex.type Mutex struct {    state int32    sema  uint32}
// A Locker represents an object that can be locked and unlocked.type Locker interface {    Lock()    Unlock()}
const (    mutexLocked = 1 << iota // mutex is locked    mutexWoken    mutexWaiterShift = iota)
// Lock locks m.// If the lock is already in use, the calling goroutine// blocks until the mutex is available.func (m *Mutex) Lock() {    // Fast path: grab unlocked mutex.    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {        return    }
    awoke := false    for {        old := m.state        new := old | mutexLocked        if old&mutexLocked != 0 {            new = old + 1<<mutexWaiterShift        }        if awoke {            // The goroutine has been woken from sleep,            // so we need to reset the flag in either case.            new &^= mutexWoken        }        if atomic.CompareAndSwapInt32(&m.state, old, new) {            if old&mutexLocked == 0 {                break            }            runtime_Semacquire(&m.sema)            awoke = true        }    }}
// Unlock unlocks m.// It is a run-time error if m is not locked on entry to Unlock.//// A locked Mutex is not associated with a particular goroutine.// It is allowed for one goroutine to lock a Mutex and then// arrange for another goroutine to unlock it.func (m *Mutex) Unlock() {    // Fast path: drop lock bit.    new := atomic.AddInt32(&m.state, -mutexLocked)    if (new+mutexLocked)&mutexLocked == 0 {        panic("sync: unlock of unlocked mutex")    }
    old := new    for {        // If there are no waiters or a goroutine has already        // been woken or grabbed the lock, no need to wake anyone.        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {            return        }        // Grab the right to wake someone.        new = (old - 1<<mutexWaiterShift) | mutexWoken        if atomic.CompareAndSwapInt32(&m.state, old, new) {            runtime_Semrelease(&m.sema)            return        }        old = m.state    }}
   复制代码
 四、结尾
Golang Mutex 一共优化了四版,我们上面已经分析了 V1 和 V2 版,后续会继续分析为什么还需要 V3,V4 版,他们又分别解决了什么问题。
最后我想出几个问题,知道答案的兄弟们可以在评论区回复:
1、为什么不用 old = atomic.LoadInt32(&m.state),而直接使用 old := m.state,在并发情况下不会出现不一致吗?
2、Unlock 中,为啥不直接这么判断
 if m.state != mutexLocked {    panic("sync: unlock of unlocked mutex")}
   复制代码
 
反而要这么判断:
 new := atomic.AddInt32(&m.state, -mutexLocked)    if (new+mutexLocked)&mutexLocked == 0 {        panic("sync: unlock of unlocked mutex")}
   复制代码
 
3、如果 G1 通过 Lock 获取了锁, 在持有锁的期间,G2 调用 Unlock 释放这个锁, 会出现什么现象?
推荐阅读
1、原来阿里字节员工简历长这样
2、一条SQL差点引发离职
3、MySQL并发插入导致死锁
如果你也觉得我的分享有价值,记得点赞或者收藏哦!你的鼓励与支持,会让我更有动力写出更好的文章哦!
更多精彩内容,请关注公众号「云舒编程」
评论