写点什么

Go 语言 sync.Mutex 源码分析

用户头像
Dnnn
关注
发布于: 2020 年 09 月 10 日

go 语言以并发作为其特性之一,并发必然会带来对于资源的竞争,这时候我们就需要使用 go 提供的 sync.Mutex 这把互斥锁来保证临界资源的访问互斥。



既然经常会用这把锁,那么了解一下其内部实现,就能了解这把锁适用什么场景,特性如何了。



打开源码,我们先看一下mutex.go文件的描述。



// Mutex fairness.

//

// Mutex can be in 2 modes of operations: normal and starvation.

// In normal mode waiters are queued in FIFO order, but a woken up waiter

// does not own the mutex and competes with new arriving goroutines over

// the ownership. New arriving goroutines have an advantage -- they are

// already running on CPU and there can be lots of them, so a woken up

// waiter has good chances of losing. In such case it is queued at front

// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,

// it switches mutex to the starvation mode.

//

// In starvation mode ownership of the mutex is directly handed off from

// the unlocking goroutine to the waiter at the front of the queue.

// New arriving goroutines don't try to acquire the mutex even if it appears

// to be unlocked, and don't try to spin. Instead they queue themselves at

// the tail of the wait queue.

//

// If a waiter receives ownership of the mutex and sees that either

// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,

// it switches mutex back to normal operation mode.

//

// Normal mode has considerably better performance as a goroutine can acquire

// a mutex several times in a row even if there are blocked waiters.

// Starvation mode is important to prevent pathological cases of tail latency.



//------------------------------------------------------------------------------------------------------------------------------------

// 互斥公平锁 .

// 锁有两种模式,【正常模式】和【饥饿模式】

// 在正常模式下锁有等待锁的goroutine都会进入一个先进先出的队列(轮流被唤醒),但是被

//唤醒的goroutine不会直接获得锁,而是要跟新到来的gorotine竞争。

//新来的goroutine有个一个优势 -- 他们已近CPU上运行,并且数量众多,

//所以刚被唤醒的goroutine大概率获取不到锁.在这样的情况下,被唤醒的goroutine会被

//队列头部。如果一个goroutine等待超过1ms(写死的)没有获取到锁,互斥锁将进入饥饿模式。

//

//在饥饿模式中,解锁的goroutine会将锁直接交付给等待队里最前面的goroutine.

//新来的goroutine 不会尝试获取锁(即使锁在空闲状态),也不会进行自旋,

//他们只是加入到等待队列尾部.

//

//如果一个goroutine 获取到锁,他会判断

//1 . 他是否是位于等待队列末尾

//2 . 他等待是否超过1ms

// 以上只有有一个成立,将把互斥锁切换至正常模式

//

// 正常模式 :具有较好的性能,即使存在许多阻塞者,goroutine也也会尝试几次获取锁。

// 饥饿模式 :对于防止尾部延迟是非常重要的。



`` sync.Mutex``

// A Mutex 是一个互斥锁
// 0 值代码表未加锁转态
//
//互斥锁在第一次被使用后不能被复制.
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked state第1位
mutexWoken //state第2位
mutexStarving //state第3位
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)

stage

这个字段会同时被多个goroutine公用(使用atomic来保证原子性),第1个bit 表示已加锁。第2个bit 表示某个goroutine被唤醒,尝试获取锁,第3个bit表示这把锁是否是饥饿状态。



``[1][1][1]`` : 第一个[1] 表示锁状态,第二个[1]表示是否有唤醒,第三个[1]表示是否是饥饿模式

·``001`普通模式 ,无唤醒, 锁 ,`010` 普通模式, 有唤醒 ,无锁状态,,`101`` 饥饿模式 ,无唤醒 ,锁



sema

用来唤醒 goroutine 所用的信号量。



LOCK

`` 在看代码之前,我们需要有一个概念:每个 goroutine 也有自己的状态,存在局部变量里面(也就是函数栈里面),goroutine 有可能是新到的、被唤醒的、正常的、饥饿的。``



func (m *Mutex) Lock() {
// 如果锁是空闲状态,直接获取锁 通过 atomic.CompareAndSwapInt32 保证原子性
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
//用来保存 goroutine 等待时间
var waitStartTime int64
// 用来存当前goroutine是否饥饿
starving := false
// 用来存当前goroutine是否已唤醒
awoke := false
// 用来存当前goroutine的循环次数
iter := 0
// 复制一下当前锁的状态
old := m.state
//自旋起来
for {
// Don't spin in starvation mode, ownership is handed off to waiter
//[翻译] 在饥饿模式下就不要自旋了,因为锁会直接被交付
// so we won't be able to acquire the mutex anyway.
//[翻译] 所以自旋也获取不到锁
// 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
// 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋,
// 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
// [伪代码]:if isLocked() and isNotStarving() and canSpin()
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
//[翻译] 主动自旋是有意义的
// Try to set mutexWoken flag to inform Unlock
//[翻译] 尝试修改唤醒标志
// to not wake other blocked goroutines.
//[翻译] 这样就可以不唤醒其他goroutines
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 这段代码来修改 mutex stage 第2位 设置有唤醒标识。这样就不会去唤醒其他的goroutine了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//进行自旋
runtime_doSpin()
//循环迭代计数
iter++
//更新锁的状态 ,因为在这段时间里锁的状态可能被其他goroutine修改了
old = m.state
continue
}
//到了这一步,state的状态可能是
//1. 未加锁 ,普通模式
//2. 未加锁,饥饿模式
//3. 已加锁,饥饿模式
//4. 已加锁,普通模式 (可能做上面执行时,锁被其他goroutine 获取了)
//获取锁的最新状态,这个字段用来存储希望设置锁的状态
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
//[翻译]不要饥饿模式下获取锁,新来的去排队
// [伪代码]:if isNotStarving() ,也就是说是饥饿状态时不获取锁
if old&mutexStarving == 0 {
//new 设置为获取锁状态
new |= mutexLocked
}
//如果是锁状态,或者是饥饿状态,就设置等待队列+1 ,(此时就是等位 +1)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
//[翻译] 当前goroutine 将所切换至饥饿模式
// But if the mutex is currently unlocked, don't do the switch.
//[翻译]但是如果锁的状态是unlocked 就不要切换。
// Unlock expects that starving mutex has waiters, which will not be true in this case.
//[翻译] unlock 期望一个饥饿模式的gorutine时,这个例子就不成立了(也就说如果有其他的goroutine将锁切换成饥饿模式)
//[伪代码] isNotStarving and isLock
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//如果当前goroutine 是唤醒状态,那么我要resest这个状态
//因为goroutine要么是拿到锁了,要么是进入sleep了
if awoke {
// The goroutine has been woken from sleep,
//[翻译]goroutine 已近被唤醒了。
// so we need to reset the flag in either case.
//[翻译]所以我们要切换状态了
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//设置成非唤醒状态
new &^= mutexWoken
}
// 通过CAS来尝试设置锁的状态
// 这里可能是设置锁,也有可能是只设置为饥饿状态和等待数量
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
// 那么当前goroutine已经获取了锁的拥有权,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
//[翻译] 如果我们已经在排队了,就排在队伍的最前面。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
//计算等待时间
waitStartTime = runtime_nanotime()
}
// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo)
//如果当前是饥饿状态,并且等待超过1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 得到当前的锁状态
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
//[翻译] 如果这个goroutine唤醒,并且锁是饥饿模式
// ownership was handed off to us but mutex is in somewhat
//[翻译] 锁会直接传递给我们
// inconsistent state: mutexLocked is not set and we are still
//[翻译] 如果锁处于不一致状态,那么会出现问题
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前的goroutine获得了锁,那么就把等待队列-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果是最后一个等待者,就退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
//[翻译] 退出饥饿模式
// Critical to do it here and consider wait time.
//[翻译]重要的是要在这里做,并考虑等待时间。
// Starvation mode is so inefficient, that two goroutines can go lock-step infinitely
//[翻译]饥饿模式非常低效,两个goroutine一旦切换到饥饿模式,就会无限地执行锁步。
delta -= mutexStarving
}
//加锁
atomic.AddInt32(&m.state, delta)
break
}
// 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒
// 并且重置iter(重置spin)
awoke = true
iter = 0
} else {
// 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
// 那么就更新状态,重新开始循环尝试拿锁
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}



UNLOCK

>接下来我们来看看 Unlock 的实现,对于 Unlock 来说,有两个比较关键的特性:

如果说锁不是处于 locked 状态,那么对锁执行 Unlock 会导致 panic;

锁和 goroutine 没有对应关系,所以我们完全可以在 goroutine 1 中获取到锁,然后在 goroutine 2 中调用 Unlock 来释放锁(这是什么骚操作!)



// Unlock unlocks m.
// [翻译] 解锁
// It is a run-time error if m is not locked on entry to Unlock.
//[翻译] 如果没有locked 执行 unlock 会有一个run-time error
// A locked Mutex is not associated with a particular goroutine.
//[翻译] 一个被锁的互斥对象与一个特定的goroutine没有关联。
// It is allowed for one goroutine to lock a Mutex and then
//[翻译] 允许其他的goroutine进行解锁
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
//解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//解锁成功,并且不是解锁状态
if new&mutexStarving == 0 {
//复制锁状态
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// [翻译] 如果没有没有等待着,或者没有唤醒的goroutine,不用唤醒任何人。
//如果没有其他的goroutine 加锁。
//在饥饿模式下锁会被直接传递,但是我们这里不关注饥饿模式下的设置,
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁
// 那么我们就要把锁的状态设置为被唤醒,等待队列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 如果状态设置成功了,我们就通过信号量去唤醒goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
// 循环结束的时候,更新一下状态,因为有可能在执行的过程中,状态被修改了(比如被Lock改为了饥饿状态)
old = m.state
}
} else {
// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
// 新来的goroutine不会把锁抢过去.
runtime_Semrelease(&m.sema, true)
}
}



结语

锁和解锁的代码只有这么简单的几行,但是其中的原来和设计的巧妙点缺非常多,从这个里我们可以看出,系统设计的好坏跟代码多少无关,系统内涵的设计跟代码设计也无关,真的大师一定是大道至简。



发布于: 2020 年 09 月 10 日阅读数: 52
用户头像

Dnnn

关注

还未添加个人签名 2019.07.09 加入

还未添加个人简介

评论

发布
暂无评论
Go语言  sync.Mutex 源码分析