写点什么

深入浅出 Go - sync/atomic 源码分析

用户头像
哈希说
关注
发布于: 2020 年 12 月 17 日
深入浅出 Go - sync/atomic 源码分析

对于并发操作,原子操作 (不可异常/中断的一系列操作) 是一个绕不开的话题,典型的就是 i++ 问题,并发场景下,有多个 CPU 并发执行 i++,原本只想执行一次,实际上变成执行多次,这就涉及到我们今天要聊的 sync/atomic 了。常见的原子操作有



  • Test-and-Set-Lock,TSL,对某个存储器位置写值并返回旧值

  • Fetch-and- Add,FAA,对某个存储器位置加值并返回新值

  • Compare-and-Swap,CAS,判断某个存储器位置的值是否与指定值相等,如果相等则替换为新的值

  • Load-Linked/Store-Conditional,LL/SC,Load-Linked 返回某个存储器位置原值,Store-Conditional 写入新值 (如果 Load-Linked 后没有被修改)



原子操作是如何实现的



早期大多数 CPU 原子操作的硬件实现是通过 LOCK 指令对总线 (bus) 进行加锁 (这种锁称为总线锁),阻塞其它 CPU 的内存访问。不过这种实现方式可想而知是比较低效的。现在大多数 CPU 原子操作的硬件实现是通过 LOCK 指令对 Cache Line 进行加锁,阻塞其它 CPU 对该 Cache Line 的内存访问,通过缓存一致性机制来加锁,缓存一致性机制的实现方式有很多种,之后会写一篇文章来讲其中的一种,MESI 缓存协议。当然也不是说总线锁就没用了,如果访问的数据大于 Cache Line,那么 CPU 还是会使用总线锁的



Go 实现的原子操作



// TSL
//
// old = *addr
// *addr = new
// return old
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
// FAA
//
// *addr += delta
// return *addr
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
// CAS
//
// if *addr == old {
// *addr = new
// return true
// }
// return false
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
// Read
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
// Write
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)



CAS 与 ABA 问题



CAS,Compare-and-Swap,一般我们会用它是实现乐观锁,自旋锁,伪代码如下



while(!swapped) {
swapped = CAS(V, E, N)
sleep(1)
}



所谓自旋锁,Spin Lock,就是循环等待,直到获取到锁。不过乐观锁其实也会带来如下问题



  • 自旋开销大

  • ABA 问题



自旋开销大,这个看伪代码就能明白。那么 ABA 问题是什么呢?假设有线程 X,Y 和变量 N



  1. 当线程 X 用 CAS 将变量 N 的值从 A 改为 B 时

  2. 此时线程 Y 将变量 N 的值改为 A

  3. 这时候线程 X 的 CAS 判断变量 N 的值没有发生过变化,不符合预期



这就是 ABA 问题,解决方法是可以增加 version 或 timestamp,每次变量更新时 version++,这也就将 A->B->A 变成了 1A->2B->3A



LL/SC 相比于 CAS,它没有 ABA 问题,这个看它的同步原语就知道了,不过大多数汇编指令集还未实现



atomic.Value 源码阅读



sync/atomic 的原子读写只提供了 int32int64uint32uint64uintptrunsafe.Pointer 这几个数据类型。如果是其它数据类型的话就需要使用 atomic.Value 来实现原子读写了



func main() {
type pair struct {
x, y int
}
p := pair{1, 2}
var v atomic.Value
v.Store(p)
fmt.Println(v.Load().(pair).x)
fmt.Println(v.Load().(pair).y)
}



atomic.Value 的源码如下



// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v interface{} // ifaceWords
}
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}



通过注释我们知道 atomic.Value 的零值为 nil,且使用后不允许被拷贝。写入值后 ifaceWordstyp 保存数据类型,data 保存值。atomic.Value 只实现了原子读写,我们来看看 Store 的实现



// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}



通过首行的注释我们知道使用 Store 保存值后,数据类型就固定下来了,后续操作时必须使用相同的数据类型,否则会 panic,且不能保存 nil。如果是首次 Store 则会调用 runtime_procPin() 禁止当前 P 被抢占,然后调用 CAS 抢占乐观锁 ,将 typ 修改为中间值 unsafe.Pointer(^uintptr(0)),所以你后面会看到 if uintptr(typ) == ^uintptr(0) 这行代码,如果为 true 则表示还在抢占乐观锁中,如果抢到了乐观锁就会去修改 typdataLoad 的源码实现就相对简单很多



func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}



Load 的源码确实没啥好说的,只要看懂 Store 的源码,Load 的源码自然就懂了





发布于: 2020 年 12 月 17 日阅读数: 23
用户头像

哈希说

关注

还未添加个人签名 2018.03.08 加入

还未添加个人简介

评论

发布
暂无评论
深入浅出 Go - sync/atomic 源码分析