写点什么

深入浅出 Go - sync.Map 源码分析

用户头像
哈希说
关注
发布于: 2020 年 11 月 20 日
深入浅出 Go - sync.Map 源码分析

Go 的 map 在并发场景下,只读是线程安全的,读写则是线程不安全的。Go1.9 提供了并发安全的 sync.Map,通过阅读源码我们知道 snyc.Map 通过读写分离的机制,降低了锁的粒度,以此来提高并发性能



并发不安全的 map



func main() {
m := make(map[int]int)
go func() {
for {
m[1] = 1
}
}()
go func() {
for {
_ = m[1]
}
}()
time.Sleep(time.Second * 10)
}



执行程序会报如下错误



$ go run main.go
fatal error: concurrent map read and map write



并发安全的 sync.Map



对于并发不安全的 map,一般这种情况我们可以通过加锁的方式来实现并发安全的 hashmap,但是锁本身也会带来额外的性能开销,所以 Go1.9 开始标准库提供了并发安全的 sync.Map,使用起来也很简单,如下



func main() {
m := sync.Map{}
// 添加元素
m.Store("key", "value")
// 获取元素
if value, ok := m.Load("key"); ok {
fmt.Println(value)
}
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
// 删除元素
m.Delete("key")
// 并发读写
go func() {
for {
m.Store(1, 1)
}
}()
go func() {
for {
_, _ = m.Load(1)
}
}()
time.Sleep(time.Second * 10)
}



sync.Map



直接进入主题,看源码



type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
type readOnly struct {
m map[interface{}]*entry
amended bool
}



  • read 只读数据 readOnly

  • dirty 读写数据,操作 dirty 需要用 mu 进行加锁来保证并发安全

  • misses 用于统计有多少次读取 read 没有命中

  • amended 用于标记 readdirty 的数据是否一致



Load



func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}



Load 的源码可以看出读取数据时,首先是从 read 读,没有命中的话会到 dirty 读取数据,同时调用 missLocked() 增加 misses



func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}



可以看到当 misses 大于 len(dirty) 时则表示 readdirty 的数据相差太大,sync.Map 会将 dirty 的数据赋值给 read,而 dirty 会被置空



Store



func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}



Store 首先会直接到 read 修改数据,修改成功则直接返回,如果 key 不存在那么就表示要到 dirty 找数据,如果 dirty 存在 key 则修改,如果不存在则新增,同时还要将 read 中的 amended 标记为 true,表示 readdirty 的数据已经不一致了



Range



func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}



Range 的源码没太多可以说的,有两点需要关注,一个是 Range 会保证 readdirty 是数据同步的,另一个是回调函数返回 false 会导致迭代中断



Delete



func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}



Delete 采用的是延迟删除的机制,首先会到 read 查找是否存在 key,如果存在则执行 entry.delete 进行软删除,通过 CAS 将指针 entry.p (存放数据的指针) 置为 nil,减少锁开销提高并发性能。只有当 read 找不到 keyamended 为 true 才会通过 delete 进行硬删除,记住这个阶段是会加锁的





发布于: 2020 年 11 月 20 日阅读数: 503
用户头像

哈希说

关注

还未添加个人签名 2018.03.08 加入

还未添加个人简介

评论

发布
暂无评论
深入浅出 Go - sync.Map 源码分析