[Go 并发编程实战课]02.Mutex 源代码

用户头像
custer
关注
发布于: 2020 年 10 月 14 日
[Go 并发编程实战课]02.Mutex 源代码

CAS 是实现互斥锁和同步原语的基础:

CAS 指令将给定的值和一个内存地址中的值进行比较,如果是同一个值,就使用新值替换内存地址中的值。CAS 是原子性的操作,保证这个指令总是基于最新的值进行计算,如果同时有其他线程已经修改了这个值,那么,CAS会返回失败。



学习 mutex 源代码,鸟窝的:sync.mutex 源代码分析

源代码分析

首先学习下 Mutex 注释,这些注释将极大的帮助我们理解Mutex的实现。

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
翻译:互斥锁有两种状态:正常状态和饥饿状态。
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
翻译:在正常状态下,所有等待锁的 goroutine 按照 FIFO 顺序等待。
唤醒的 goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁的拥有。
新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,
所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败。
在这种情况下,这个被唤醒的 goroutine 会加入到等待队列的前面。
如果一个等待的 goroutine 超过 1ms 没有获取锁,那么它将会把锁转变为饥饿模式。
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
翻译:在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。
新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态,
也不会去尝试自旋操作,而是放在等待队列的尾部。
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
翻译:如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:
(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
翻译:正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。

sync.Mutex 包含两个字段

type Mutex struct {
state int32
sema uint32 // 等待者队列使用的使用的信号量,用来控制等待goroutine的阻塞休眠和唤醒
}
const (
mutexLocked = 1 << iota // mutex is locked 持有锁的标记
mutexWoken // 唤醒标记
mutexStarving // 标记锁是否处于饥饿状态
mutexWaiterShift = iota // 阻塞等待的 waiter 数量
starvationThresholdNs = 1e6 // 将饥饿模式的最大等待时间阈值设置成了 1 毫秒
)



state 是一个共用字段。

  • 第0个 bit 标记这个 mutex 是否已被某个 goroutine 所拥有,为0表示未被锁。

  • 第1个 bit 标记这个 mutex 是否已唤醒,也就是有某个唤醒的 goroutine 要尝试获取锁。

  • 第2个 bit 标记这个 mutex 状态,值 1 表示此锁已处于饥饿状态。

同时,尝试获取锁的 goroutine 也有状态,有可能它是新来的 goroutine,也有可能是被唤醒的 goroutine,可能是出于正常状态的 goroutine,也有可能是出于饥饿状态的 goroutine。

Lock

func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.幸运之路,一下就获取到了锁
// 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 标记本goroutine的等待时间
starving := false // 本goroutine是否已经处于饥饿状态
awoke := false // 本goroutine是否已唤醒
iter := 0 // 自旋次数
old := m.state // 复制锁的当前状态
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
// 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。
// 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
// 到了这一步, state的状态可能是:
// 1. 锁还没有被释放,锁处于正常状态
// 2. 锁还没有被释放, 锁处于饥饿状态
// 3. 锁已经被释放, 锁处于正常状态
// 4. 锁已经被释放, 锁处于饥饿状态
//
// 并且本gorutine的awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
// new 复制 state的当前状态, 用来设置新的状态
// old 是锁当前的状态
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
// 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
// 将等待队列的等待者的数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
// 总之state的新状态不再是woken状态.
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
// 通过CAS设置new state值.
// 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
// 那么当前goroutine已经获取了锁的拥有权,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
} // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
// 处理饥饿状态
// If we were already waiting before, queue at the front of the queue.
// 如果以前就在队列里面,加入到队列头
// 设置/计算本goroutine的等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞等待
// sleep之后,此goroutine被唤醒,计算当前goroutine是否已经处于饥饿状态.
// 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 得到当前的锁状态
old = m.state
// 如果当前的state已经是饥饿状态
// 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
// 那么state是一个非法状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前goroutine用来设置锁,并将等待的goroutine数减1.
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态,
// 那么我们需要把锁的state状态设置为正常模式.
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
// 退出饥饿模式
delta -= mutexStarving
}
// 设置新state, 因为已经获得了锁,退出、返回
atomic.AddInt32(&m.state, delta)
break
}
// 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
awoke = true
iter = 0
} else { // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

Unlock

package main
import "sync/atomic"
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 将持有锁的标识设置为未加锁的状态,这是通过减 1 而不是将标志位置零的方式实现
new := atomic.AddInt32(&m.state, -mutexLocked) // 去掉锁标志
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic
if (new+mutexLocked)&mutexLocked == 0 { // 本来就没有加锁
throw("sync: unlock of unlocked mutex")
}
// 释放了锁,还得需要通知其它等待者
// 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁
// 锁如果处于正常状态,
// new state如果是正常状态
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 将等待的goroutine数减一,并设置woken标识
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
// 新来的goroutine不会把锁抢过去.
runtime_Semrelease(&m.sema, true, 1)
}
}

饥饿模式和正常模式

请求锁时调用的 Lock 方法中一开始是 fast path,这是一个幸运的场景,当前的 goroutine 幸运的获得了锁,没有竞争,直接返回,否则就进入了 lockSlow 方法。这样的设计,方便编译器对 Lock 方法进行内联,可以在程序开发中应用这个技巧。



正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以在高并发的情况下,被唤醒的 waiter 可能比较悲惨的获取不到锁,这时,它会被插入到队列的前面。

如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么这个 Mutex 就进入饥饿模式。



在饥饿模式下,Mutex 的拥有着将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会抢,也不会 spin,它会乖乖地加入到等待队列的尾部。



如果拥有 Mutex 的 waiter 发现下面两种情况之一,它就会把这个 Mutex 转换成正常模式:

  • 此 waiter 已经是队列中的最后一个 waiter 了,没有其他等待锁的 goroutine。

  • 此 waiter 的等待时间小于 1 毫秒。

正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。

饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

逐步分析 Mutex 的关键代码

从请求锁(lockSlow)的逻辑看起。

  1. 首先要搞清楚 state 的意义,state 包含四个部分,mutexLocked 持有锁标记、 mutexWoken 唤醒标记、mutexStarving 饥饿标记、 mutexWaiterShift 阻塞等待的 waiter 数量。

  2. 第 16 行 var waitStartTime int64 记录 goroutine 请求锁的初始时间,第 17 行 starving := false 标记是否处于饥饿状态,第 18 行 awoke := false 标记是否唤醒,第 19 行 iter := 0 记录 spin 的次数。

  3. 第 22 行到第 39 行,它会对正常状态抢夺锁的 goroutine 尝试 spin,就是在临界区耗时很短的情况下提高性能。

  4. 第 41 行到第 56 行,非饥饿状态下抢锁,就是要把 state 的锁的那一位,置为加锁状态,后续 CAS 如果成功就可能获取到了锁。

  5. 第 57 行到第 60 行,如果锁已经被持有或者锁处于饥饿状态,等待,所以 waiter 的数量加 1。

  6. 第 61 行到第 69 行,如果此 goroutine 已经处在饥饿状态,并且锁还被持有,那么,我们需要把此 Mutex 设置为饥饿状态。

  7. 第 70 行到第 79 行,是清除 mutexWoken 标记,因为不管是获得了锁还是进入休眠,我们都需要清除 mutexWoken 标记。

  8. 第 82 行就是尝试使用 CAS 设置 state。如果成功,第 85 行到第 87 行是检查原来的锁的状态是未加锁状态,并且也不是饥饿状态的话就成功获取了锁,返回。

  9. 第 92 行判断是否第一次加入到 waiter 队列。到这里,你应该就能明白第 16 行为什么不对 waitStartTime 进行初始化了,我们需要利用它在这里进行条件判断。

  10. 第 99 行将此 waiter 加入到队列,如果是首次,加入到队尾,先进先出。如果不是首次,那么加入到队首,这样等待最久的 goroutine 优先能够获取到锁。此 goroutine 会进行休眠。

  11. 第 102 行判断此 goroutine 是否处于饥饿状态。注意,执行这一句的时候,它已经被唤醒了。

  12. 第 107 行到第 133 行是对锁处于饥饿状态下的一些处理。

  13. 第 118 行设置一个标志,这个标志稍后会用来加锁,而且还会将 waiter 数减 1。

  14. 第 128 行,设置标志,在没有其它的 waiter 或者此 goroutine 等待还没超过 1 毫秒,则会将 Mutex 转为正常状态。

  15. 第 131 行则是将这个标识应用到 state 字段上。



继续看下释放锁(Unlock)调用的 unlockSlow 方法:

  1. 如果 Mutex 处于正常状态,第 63 行直接唤醒等待队列中的 waiter。

  2. 如果 Mutex 处于饥饿状态,如果没有 waiter,或者已经有在处理的情况了,那么释放就好,不做额外的处理(第 40 行到第 42 行)。

  3. 否则,waiter 数减 1,mutexWoken 标志设置上,通过 CAS 更新 state 的值(第 45 行到第 50 行)。

思考

  1. 目前 Mutex 的 state 字段有几个意义,这几个意义分别是由哪些字段表示的?

  2. 等待一个 Mutex 的 goroutine 数最大是多少?是否能满足现实的需求?



用户头像

custer

关注

让正确的事情连续发生 2017.11.20 加入

还未添加个人简介

评论

发布
暂无评论
[Go 并发编程实战课]02.Mutex 源代码