写点什么

Go sync.Map 源码解读

用户头像
werben
关注
发布于: 2021 年 03 月 24 日

内置的 map


Go 的内置的 map 是不支持并发写操作的,不是并发安全的。


看下面的代码运行会提示


fatal error: concurrent map writes


package main
import "time"
var globleMap map[string]interface{}
func MapEdit(key string) { for i := 0; i < 10000; i++ { globleMap[key] = i }}
func main() { globleMap = make(map[string]interface{}) go MapEdit("1") go MapEdit("2") time.Sleep(time.Second * 30)}
复制代码


因此官方在 go 1.9 版本中给了 sync.Map 来满足并发编程中的应用。


这里实际上是一个 lock-free,关于 lock-free 和 wait-free 可以看看这里:wait-free是指什么?


sync.Map 的实现原理可概括为:


read 和 dirty


我们可以这么理解,现在将 map 里面的数据分成两部分保存,我用两个桶来比喻,


  • 一些是干净(read)的,放在白桶


  • 一些是脏(dirty)的,放在脏桶,我这也叫黑桶


另外我还要记录一下,这个脏的程度,怎么判断脏的程度呢,这里我用一个脏值(misses)来标识脏的程度,每次读数据时发现有一个数据(key)不在白桶而是在黑桶,我就将脏值数量(misses)+1,如果发现脏值(misses)居然比黑桶(read)里的数据总数量还多,那就得清洗一下黑桶(dirty)里面的数据了,将清洗完的脏(dirty)数据全部放到白桶(read)那里去,这时候黑桶所有的数据(dirty)被清空了,脏值(misses)也要重置为 0


白桶里面还贴了一个字条(amended: 英文释义修正的,改正的),这个字条表示黑桶是不是刚刚被清洗收拾,因为黑桶不是一直被收拾的,每次当黑桶被收拾,脏值清空的时候,这个字条就写上“已收拾”(amended=false)。


写数据


数据在白桶里


一个新的数据 key 要保存到 map 里面,首先先查一下这个数据(key)是不是已经在白桶里面,如果数据在白桶里面,直接更新这个数据就行了。


数据在黑桶里


如果这个数据(key)不在白桶里(read),字条还写着“欠收拾”那数据就有可能在黑桶(dirty)里,如果在黑桶(dirty)里找到了,更新这个黑桶(dirty)里面的数据(key),同时更新下脏值(misses)+1,接下来还要判断下脏值的数据量不是已经比白桶(read)里面数据总量还大,如果是,那就得清洗下黑桶(dirty)里面的数据


数据即不在白桶里,也不在黑桶里


数据既不在白桶,也不在黑桶,要分两种情况


  • 白条写着“已收拾”,说明黑桶恰巧刚刚被收拾了,那么现在重新将白桶里面所有的数据又复制一份放到黑桶里面去,白桶里面字条改成“欠收拾”(amended=true),然后直接写入数据到黑桶


  • 白桶里面写着“欠收拾”,说明黑桶里面有脏数据,但是又没到收拾他的时候,直接写入数据到黑桶


读数据


数据在白桶


直接从白桶取数据


数据不在白桶,白条写着“欠收拾”:


说明黑桶有白桶不存在的数据,数据可能在黑桶,从黑桶取数据,如果 key 数据存在,脏值+1,判断是否需要清洗黑桶


数据不在白桶,黑桶也找不到


数据不存在,返回 nil


用法问题


看下面的示例,写入 map1000 个 key,但是读取只读了 900 次 key,


这时候根本白桶的数据始终都会是空的,因为黑桶只有脏值到了 1000 才会将数据清洗到白桶。


也就是这种情况,可能效率还没有自己写的使用内置的 map,加锁的效率高。


所以 sync.map 适合少量写,大量读的场景


package main
import ( "math/rand" "testgorm/mymap" "time")
func main() { var m mymap.Map //写入1000个key,这时黑桶数据数量是1000 for i := 0; i < 1000; i++ { rand.Seed(time.Now().UnixNano()) data := rand.Intn(1000) m.Store(i, data) } //读取900次key,这时候脏值为900 < 黑桶数据数量 //白桶的数据一直都是空的,因为黑桶一直没到清洗的条件 for i := 0; i < 900; i++ { if _, ok := m.Load(i); ok { } }}
复制代码


源码阅读


package mymap
import ( "fmt" "sync" "sync/atomic" "unsafe")
type Map struct { //用来锁dirty mu sync.Mutex //白桶 read atomic.Value //黑桶 dirty map[interface{}]*entry //脏值 misses int}
type readOnly struct { //封装的map m map[interface{}]*entry //白条 amended bool}
//expunged是一个指针指向一个空对象,也就是指向interface{}var expunged = unsafe.Pointer(new(interface{}))
//entry就是一个指针type entry struct { p unsafe.Pointer}
func newEntry(i interface{}) *entry { return &entry{p: unsafe.Pointer(&i)}}
//从sync.map读数据func (m *Map) Load(key interface{}) (value interface{}, ok bool) { //先看白桶里面有没有key read, _ := m.read.Load().(readOnly) //有则直接返回 e, ok := read.m[key] //如果白桶里没有这个key,而且白条写着“欠收拾”,说明黑桶里面可能有key if !ok && read.amended { m.mu.Lock() //上面白桶获取时没有加锁,上锁后再检查一次 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] //再检查一次也没有,则直接从黑桶里面读取数据 if !ok && read.amended { e, ok = m.dirty[key] //脏值+1, 如果脏值比黑桶数据数量还大,则收拾黑桶 m.missLocked() } m.mu.Unlock() } //如果白桶黑桶里都没有数据,返回nil if !ok { return nil, false } //返回entry指针对应的值 return e.load()}
//获取entry指针对应的值func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true}
//往sync.map写数据func (m *Map) Store(key, value interface{}) { //先看白桶里面有没有key read, _ := m.read.Load().(readOnly) //白桶存在key直接更新白桶里面的数据 if e, ok := read.m[key]; ok && e.tryStore(&value) { return }
m.mu.Lock() //上面白桶获取没有加锁,上锁后再写入一次白桶 read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { //白桶存在key直接更新白桶里面的数据 m.dirty[key] = e } //将黑桶里面的key也指向value,相当于黑桶和白桶都指向同一份数据 e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { //如果白桶里面没有这个key,但是黑桶里面有,直接更新黑桶里面的数据 e.storeLocked(&value) } else { //白桶和黑桶都没有这个key if !read.amended { //如果黑桶刚刚被收拾过,白条“已收拾”,则需要将白桶里的数据,复制一份到黑桶 m.dirtyLocked() //设置白条为“欠收拾” m.read.Store(readOnly{m: read.m, amended: true}) } //将新数据丢到黑桶 m.dirty[key] = newEntry(value) } m.mu.Unlock()}
//tryStore就是一个基于CAS的原子写操作func (e *entry) tryStore(i *interface{}) bool { for { p := atomic.LoadPointer(&e.p) //如果原先key对应的值是一个interface{}空对象,返回失败 if p == expunged { return false } //CAS原子写入entity if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } }}
//判断entry是否=expunged(interface{}),如果是将entry设置为nilfunc (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil)}
//原子写入entryfunc (e *entry) storeLocked(i *interface{}) { atomic.StorePointer(&e.p, unsafe.Pointer(i))}
//参照Storefunc (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok { actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } }
m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } actual, loaded, _ = e.tryLoadOrStore(value) } else if e, ok := m.dirty[key]; ok { actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else { if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock()
return actual, loaded}
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { p := atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true }
ic := i for { if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { return i, false, true } p = atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true } }}
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) //数据在白桶,直接删除白桶里面的数据 e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] //数据不在白桶,黑桶“欠收拾” if !ok && read.amended { //直接黑桶清除数据 delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() }}
func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } }}
func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) if read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() }
for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } }}
//脏值+1,func (m *Map) missLocked() { //脏值+1, m.misses++ //如果脏值比黑桶数据数量还大,则收拾黑桶 if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) //将黑桶map清空 m.dirty = nil //将脏值清零 m.misses = 0}
func (m *Map) dirtyLocked() { if m.dirty != nil { return }
//黑桶已经被清洗过了,这里要重新初始化黑桶 read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) //将白桶里面的复制一份到黑桶 for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } }}
//这里就是判断entry是不是指向空对象或者是nil//如果entry=是nil,赋值为空对象,也返回truefunc (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { //如果entry指向nil,给entry赋值空对象interface{},返回true if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged}

复制代码


发布于: 2021 年 03 月 24 日阅读数: 18
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
Go sync.Map 源码解读