写点什么

万万字图解| 深入揭秘 Golang 锁结构:Mutex(下)

作者:云舒编程
  • 2024-01-25
    广东
  • 本文字数:4941 字

    阅读完需:约 16 分钟

万万字图解| 深入揭秘Golang锁结构:Mutex(下)

大家好,我是「云舒编程」,今天我们来聊聊 Golang 锁结构:Mutex。

文章首发于微信公众号:云舒编程

关注公众号获取:1、大厂项目分享 2、各种技术原理分享 3、部门内推

一、前言

书接上回,在万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中,我们已经研究了 Golang mutex V1 和 V2 版本的实现。接下来我们继续研究 V3 和 V4 版本的实现。

二、面试中遇到 Mutex

为了让剧情顺利发展,我们依旧使用万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中的面试对话模式。


面试官:你现在实现的锁的确给了新来的 Goroutine 直接获取锁的机会,但是还不够优雅。比如说,新 Goroutine 尝试获取锁失败的那一刻,锁就被释放了,但是新 Goroutine 需要等到下一次信号量唤醒加调度才有机会再次获取锁,这样其实浪费了新 Goroutine 的 CPU 时间,你可以再优化下吗?


:考虑到这种情况,可以尝试给新的 Goroutine 多次获取锁的机会,说白了就是允许自旋,但是需要给自旋加一些限制条件,避免最开始提到的性能问题。


面试官:需要加哪些限制条件呢?


:首先需要限制自旋的次数,其次操作系统的处理器个数和 Golang 调度的 P 个数都必须大于 1,否则就会是串行,自旋就没有意义了。


面试官:不错,怎么实现呢?


:我来写下:


const (    mutexLocked      = 1 // mutex is locked    mutexWoken       = 2     mutexWaiterShift = 2)
type Mutex struct { state int32 sema uint32}
func (m *Mutex) Lock() { //给新来的协程直接加锁的机会 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return }
//上面没有加锁成功,尝试在接下来的唤醒中去竞争锁 awoke := false //表示当前协程是不是被唤醒的 iter := 0 //记录当前自旋的次数 for { old := m.state new := old | mutexLocked // 设置锁标志位为1 if old&mutexLocked != 0 { //判断是否满足自旋条件 if runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true }
//内部调用procyield函数,该函数也是汇编语言实现。 //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。 //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。 runtime_doSpin() iter++ continue } new = old + 1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter+1 } ··· //剩下的不变 }}
复制代码


//判断是否可以自旋,同时满足以下4个条件才能自旋://1、自旋次数小于4次//2、cpu核数大于1//3、GOMAXPROCS>1//4、running P > 1 并且 P队列为空func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {  return false } if p := getg().m.p; p.runqhead != p.runqtail {  return false } return true}
复制代码


解锁部分没有变化


面试官:通过自旋,对于临界区代码执行非常短的场景来说,这是一个非常好的优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine 不用通过休眠唤醒方式等待调度,直接自旋几次,可能就获得了锁。但是这里也存在一个问题,你知道什么是【饥饿】吗?


:因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,就会导致【饥饿】。


面试官:那怎么解决呢?


:可以考虑给锁加一个标识,比如我们可以检测当一个老 goroutine 超过一定时间都没有获取到锁,那么他就给锁打上一个【饥饿】的标识,新来的 goroutine 发现存在该标识就不再通过自旋抢锁,而是直接进入信号量的等待队列的队尾。同时把老 goroutine 移动到信号量等待队列的队头,这样老 goroutine 下次就可以直接获取到锁了。


面试官:思路不错,写下代码吧。


:我们还是把 state 在拿出来一位表示锁是不是处于饥饿状态。



加锁部分:


type Mutex struct { state int32 sema  uint32}
const ( mutexLocked = 1 // mutex is locked mutexWoken = 2 mutexWaiterShift = 3 mutexStarving = 4 //新增字段,标识)
func (m *Mutex) Lock() { //给新来的协程直接加锁的机会 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return }
m.lockSlow()}
func (m *Mutex) lockSlow() { var waitStartTime int64 //表示等待了多久还没获取到锁 starving := false //表示当前锁是否处于饥饿状态 awoke := false //表示当前协程是不是被唤醒的 iter := 0 //自旋次数 old := m.state
for { //满足以下条件才能进入自旋: //1、锁不是饥饿状态,并且没有获取到锁 //2、满足自旋条件runtime_canSpin if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true }
//内部调用procyield函数,该函数也是汇编语言实现。 //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。 //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。 runtime_doSpin() iter++ old = m.state continue }
new := old
//如果当前锁不是饥饿状态,尝试将加锁标志置为1 if old&mutexStarving == 0 { new |= mutexLocked }
//锁没有释放,或者是饥饿模式,当前协程会阻塞在信号量上,将waiter+1 if old&(mutexLocked|mutexStarving) != 0 { new = old + 1<<mutexWaiterShift }
//已经等待超过了1ms,且锁被其他协程占用,则进入饥饿模式 if starving && old&mutexLocked != 0 { new |= mutexStarving }
if awoke { //尝试清除唤醒标志 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
//这里尝试将state从old设置为new。如果代码能够执行到这步,代表了可能发生以下几种情况的一种或者多种 //1、当前协程尝试加锁 //2、waiter+1 //3、清除唤醒标志 //4、想将锁设置为饥饿模式 if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { //不是饥饿状态,并且成功获取到锁了,返回 break }
//是否已经等待过了 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }
// 阻塞等待 // queueLifo 为 true,表示加入到队列头。否则,加入到队列尾。 // (首次加入队列加入到队尾,不是首次加入则加入队头,这样等待最久的 goroutine 优先能够获取到锁。) runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 从等待队列中唤醒,根据等待时间,判断锁是否应该进入饥饿模式。 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//如果锁已经是饥饿状态了,直接抢锁返回 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") }
//加锁并且等待者数量减一 delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 清除饥饿状态的两种情况: //. 如果不需要进入饥饿模式(当前被唤醒的 goroutine 的等待时间小于 1ms) //. 原来的等待者数量为 1,说明是最后一个被唤醒的 goroutine。 if !starving || old>>mutexWaiterShift == 1 { //清除饥饿状态 delta -= mutexStarving }
//设置状态,加锁 atomic.AddInt32(&m.state, delta) break } // 设置唤醒标记,重新抢占锁(会与那些运行中的 goroutine 一起竞争锁) awoke = true iter = 0 } else { old = m.state } }}
复制代码


解锁部分:


func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) //还有人阻塞在信号量上,需要进行唤醒 if new != 0 {  m.unlockSlow(new) }}
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
//非饥饿模式 if new&mutexStarving == 0 { old := new for { //以下情况都直接结束,不继续往下: //1、如果没有人阻塞在信号量上了 //2、其他人加锁了 //3、已经有人唤醒协程了 //4、锁又切换成饥饿模式了 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }
//waiter-1 并且将唤醒标志置为1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { //如果cas执行成功就唤醒队头协程 runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { //饥饿模式下直接唤醒队头,这里分为两种情况: //1、如果“mutexLocked”未设置,等待者在唤醒后会获取到锁。 //2、如果只设置了“mutexStarving”,则仍然认为互斥锁已被锁定,因此新到来的goroutine不会获取它,唤醒的协程会获取到锁。 runtime_Semrelease(&m.sema, true, 1) }}
复制代码


关于信号量相关接口:

//lifo:true 加入等待队列的队头,否则加入队尾func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
//handoff=true:将当前G直接放到runq,调度器可以立即该G,并且会继承上一个G的时间片,//这样的目的是为了使当前G可以得到更快的调度。func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
复制代码


面试官:不错,这就是 Golang mutex 目前的设计,兼容了公平与效率的方案。

三、总结

1、互斥锁是一种常见的控制并发读写资源的手段,go 中的互斥锁实现是 sync.Mutex。


2、Mutex 的对外的接口:

  • Lock:同一时刻只能有一个 goroutine 可以获取到锁,其他 goroutine 会先通过自旋抢占锁,抢不到则阻塞等待。

  • Unlock:释放锁,释放锁之后,会唤醒等待队列中的下一个 goroutine。


3、使用 Mutex 需要注意以下几点:

  • Lock 与 Unlock 需要成对出现,在 Unlock 之前,必须已经调用了 Lock,否则会 panic。

  • 不要将 Mutex 作为函数或方法的参数传递。

  • 不要在锁内部执行阻塞或耗时操作。

  • 可以通过 vet 这个命令行来检查上面的锁 copy 的问题。


4、go 的 Mutex 基于以下技术实现:

  • 信号量:操作系统层面的同步机制。

  • 队列:在协程抢锁失败后,会将这些协程放入一个 FIFO 队列中,下次唤醒会唤醒队列头的协程。

  • 原子操作:通过 cas 操作状态字段 state,可以保证数据的完整性。


5、go Mutex 的两种模式:

  • 正常模式:运行中的 goroutine 有一定机会比等待队列中的 goroutine 先获取到锁,这种模式有更好的性能。

  • 饥饿模式:所有后来的 goroutine 都直接进入等待队列,会依次从等待队列头唤醒 goroutine。可以有效避免【饥饿】。等待队列中的 goroutine 超过了 1ms 还没有获取到锁,那么会进入饥饿模式

四、最后

“长城不是一天建成的”,Mutex 的设计也是从简单设计到复杂处理逐渐演变的。最初 Mutex 设计非常简洁,不到 20 行代码。但是,随着使用者越来越多,需要处理的场景也越来越复杂,逐渐暴露出一些缺陷,为了弥补这些缺陷,Mutex 不得不加入越来越多的设计,也逐步变得越来越复杂。

推荐阅读

1、原来阿里字节员工简历长这样

2、一条SQL差点引发离职

3、MySQL并发插入导致死锁


如果你也觉得我的分享有价值,记得点赞或者收藏哦!你的鼓励与支持,会让我更有动力写出更好的文章哦!

更多精彩内容,请关注公众号「云舒编程」


发布于: 1 小时前阅读数: 6
用户头像

云舒编程

关注

公众号 云舒编程,大白话分享技术原理 2020-09-23 加入

字节、阿里资深工程师。 做过营销、支付、百万级Feed流优化、权限系统、网关。 专注于技术原理分享,用最简单的话分享最复杂的技术原理

评论

发布
暂无评论
万万字图解| 深入揭秘Golang锁结构:Mutex(下)_golang_云舒编程_InfoQ写作社区