一、Mutex 的架构演进
“初版”的 Mutex 使用一个 flag 来表示锁是否被持有,实现比较简单;后来照顾到新来的 goroutine,所以会让新的 goroutine 也尽可能地先获取到锁,这是第二个阶段,我把它叫作“给新人机会”;那么,接下来就是第三阶段“多给些机会”,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段“解决饥饿”。下图展示的是演进路线:
现阶段的 Mutex 结构体包含两个字段:
二、sync 的 Lock 方法实现
type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving // 从state字段中分出一个饥饿标记 mutexWaiterShift = iota starvationThresholdNs = 1e6 ) func (m *Mutex) Lock() { // Fast path: 幸运之路,一下就获取到了锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争 m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false // 此goroutine的饥饿标记 awoke := false // 唤醒标记 iter := 0 // 自旋次数 old := m.state // 当前的锁的状态 for { // 锁是非饥饿状态,锁还没被释放,尝试自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了 continue } new := old if old&mutexStarving == 0 { new |= mutexLocked // 非饥饿状态,加锁 } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift // waiter数量加1 } if starving && old&mutexLocked != 0 { new |= mutexStarving // 设置饥饿状态 } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken // 新状态清除唤醒标记 } // 成功设置新状态 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 处理饥饿状态
// 如果以前就在队列里面,加入到队列头 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 阻塞等待 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后检查锁是否应该处于饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 如果锁已经处于饥饿状态,直接抢到锁,返回 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 有点绕,加锁并且将waiter数减1 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记 } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
复制代码
首先是通过 CAS 检测 state 字段中的标志,如果没有 goroutine 持有锁,也没有等待持有锁的 gorutine,那么,当前的 goroutine 就很幸运,可以直接获得锁,这也是注释中的 Fast path 的意思。如果不够幸运,state 不是零值,那么就通过一个循环进行检查。
我们先前知道,如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠,但是在锁释放唤醒之后,它并不能像先前一样直接获取到锁,还是要和正在请求锁的 goroutine 进行竞争。
这会给后来请求锁的 goroutine 一个机会,也让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能。for 循环是不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁。代码中的第 10 行将当前的 flag 设置为加锁状态,如果能成功地通过 CAS 把这个新值赋予 state,就代表抢夺锁的操作成功了。
三、sync 的 Unlock 方法
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
复制代码
跟之前的实现相比,当前的 Mutex 最重要的变化,就是增加饥饿模式。将饥饿模式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,
通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的 goroutine 获取锁的公平性,对于我们使用锁的业务代码来说,不会有业务一直等待锁不被处理。
四、Mutex 的正常模式和饥饿模式
Mutex 可能处于两种操作模式下:正常模式和饥饿模式。
请求锁时调用的 Lock 方法中一开始是 fast path,这是一个幸运的场景,当前的 goroutine 幸运地获得了锁,没有竞争,直接返回,否则就进入了 lockSlow 方法。
正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以,在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。
如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么,这个 Mutex 就进入到了饥饿模式。
在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加入到等待队列的尾部。
如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:
五、总结
从 go 的源码中,我们理解 Mutex 的关键点:
state 就是用来控制锁状态的核心,所谓加锁,就是 把 state 修改为某个值,解锁也是类似
sema 是用来处理沉睡、唤醒的信号量,依赖于两 个 runtime 调用:
runtime_SemacquireMutex:sema 加 1 并且挂起 goroutine
runtime_Semrelease:sema 减 1 并且唤醒 sema 上等待的一个 goroutine
评论