写点什么

Sync 包 Mutex 的原理阐述

作者:Jack
  • 2023-03-30
    广东
  • 本文字数:3957 字

    阅读完需:约 13 分钟

一、Mutex 的架构演进

“初版”的 Mutex 使用一个 flag 来表示锁是否被持有,实现比较简单;后来照顾到新来的 goroutine,所以会让新的 goroutine 也尽可能地先获取到锁,这是第二个阶段,我把它叫作“给新人机会”;那么,接下来就是第三阶段“多给些机会”,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段“解决饥饿”。下图展示的是演进路线:



现阶段的 Mutex 结构体包含两个字段:


  • 字段 state :是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。所以,state 这一个字段被分成了三部分,代表三个数据。

  • 字段 sema:是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒。


二、sync 的 Lock 方法实现

   type Mutex struct {        state int32        sema  uint32    }        const (        mutexLocked = 1 << iota // mutex is locked        mutexWoken        mutexStarving // 从state字段中分出一个饥饿标记        mutexWaiterShift = iota            starvationThresholdNs = 1e6    )        func (m *Mutex) Lock() {        // Fast path: 幸运之路,一下就获取到了锁        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {            return        }        // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争        m.lockSlow()    }        func (m *Mutex) lockSlow() {        var waitStartTime int64        starving := false // 此goroutine的饥饿标记        awoke := false // 唤醒标记        iter := 0 // 自旋次数        old := m.state // 当前的锁的状态        for {            // 锁是非饥饿状态,锁还没被释放,尝试自旋            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {                    awoke = true                }                runtime_doSpin()                iter++                old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了                continue            }            new := old            if old&mutexStarving == 0 {                new |= mutexLocked // 非饥饿状态,加锁            }            if old&(mutexLocked|mutexStarving) != 0 {                new += 1 << mutexWaiterShift // waiter数量加1            }            if starving && old&mutexLocked != 0 {                new |= mutexStarving // 设置饥饿状态            }            if awoke {                if new&mutexWoken == 0 {                    throw("sync: inconsistent mutex state")                }                new &^= mutexWoken // 新状态清除唤醒标记            }            // 成功设置新状态            if atomic.CompareAndSwapInt32(&m.state, old, new) {                // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回                if old&(mutexLocked|mutexStarving) == 0 {                    break // locked the mutex with CAS                }                // 处理饥饿状态
// 如果以前就在队列里面,加入到队列头 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 阻塞等待 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后检查锁是否应该处于饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 如果锁已经处于饥饿状态,直接抢到锁,返回 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 有点绕,加锁并且将waiter数减1 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记 } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
复制代码


首先是通过 CAS 检测 state 字段中的标志,如果没有 goroutine 持有锁,也没有等待持有锁的 gorutine,那么,当前的 goroutine 就很幸运,可以直接获得锁,这也是注释中的 Fast path 的意思。如果不够幸运,state 不是零值,那么就通过一个循环进行检查。


我们先前知道,如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠,但是在锁释放唤醒之后,它并不能像先前一样直接获取到锁,还是要和正在请求锁的 goroutine 进行竞争。


这会给后来请求锁的 goroutine 一个机会,也让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能。for 循环是不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁。代码中的第 10 行将当前的 flag 设置为加锁状态,如果能成功地通过 CAS 把这个新值赋予 state,就代表抢夺锁的操作成功了。

三、sync 的 Unlock 方法

 func (m *Mutex) Unlock() {        // Fast path: drop lock bit.        new := atomic.AddInt32(&m.state, -mutexLocked)        if new != 0 {            m.unlockSlow(new)        }    }        func (m *Mutex) unlockSlow(new int32) {        if (new+mutexLocked)&mutexLocked == 0 {            throw("sync: unlock of unlocked mutex")        }        if new&mutexStarving == 0 {            old := new            for {                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {                    return                }                new = (old - 1<<mutexWaiterShift) | mutexWoken                if atomic.CompareAndSwapInt32(&m.state, old, new) {                    runtime_Semrelease(&m.sema, false, 1)                    return                }                old = m.state            }        } else {            runtime_Semrelease(&m.sema, true, 1)        }    }
复制代码


跟之前的实现相比,当前的 Mutex 最重要的变化,就是增加饥饿模式。将饥饿模式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,


通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的 goroutine 获取锁的公平性,对于我们使用锁的业务代码来说,不会有业务一直等待锁不被处理。

四、Mutex 的正常模式和饥饿模式

Mutex 可能处于两种操作模式下:正常模式和饥饿模式。


请求锁时调用的 Lock 方法中一开始是 fast path,这是一个幸运的场景,当前的 goroutine 幸运地获得了锁,没有竞争,直接返回,否则就进入了 lockSlow 方法。


正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以,在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。


如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么,这个 Mutex 就进入到了饥饿模式。


在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加入到等待队列的尾部。


如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:


  • 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了

  • 此 waiter 的等待时间小于 1 毫秒。正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

五、总结


从 go 的源码中,我们理解 Mutex 的关键点:

  • state 就是用来控制锁状态的核心,所谓加锁,就是 把 state 修改为某个值,解锁也是类似

  • sema 是用来处理沉睡、唤醒的信号量,依赖于两 个 runtime 调用:

  • runtime_SemacquireMutex:sema 加 1 并且挂起 goroutine

  • runtime_Semrelease:sema 减 1 并且唤醒 sema 上等待的一个 goroutine

用户头像

Jack

关注

还未添加个人签名 2019-05-12 加入

还未添加个人简介

评论

发布
暂无评论
Sync包Mutex的原理阐述_golang_Jack_InfoQ写作社区