原文来自微信公众号:小梁编程汇
导语 | 在召回排序业务中,由于上游请求量较大,对下游存储服务造成较大压力,业务场景要求高性能和非强一致性,所以我采用 golang 并发安全 k-v 缓存开源库进行性能优化,以下是对其调研、对比分析。如有错误,请多指正。
golang map
1. 并发读写测试
在 golang 中原生 map 在并发场景下,同时读写是线程不安全的,无论 key 是否一样。以下是测试代码
package main
import "time"
func main() {
testMapReadWriteDiffKey()
}
func testMapReadWriteDiffKey() {
m := make(map[int]int)
go func() {
for {
m[100] = 100
}
}()
go func() {
for {
_ = m[12]
}
}()
select {}
}
复制代码
如上图的 demo,并发读写 map 的不同 key,运行结果如下
map 读的时候会检查 hashWriting 标志, 如果有这个标志,就会报并发错误。写的时候会设置这个标志:**h.flags |= hashWriting.**设置完之后会取消这个标记。map 的并发问题不是那么容易被发现, 可以利用-race 参数来检查。map 并发读写冲突检测机制不是本文的重点,不过感兴趣的同学可以通过以下链接深入了解下。这是源码,文章分析看这里。编译时的选项-race,为何能分析出并发问题,详见:go官方博客,文章分析,视频讲解
2. map+读写锁
在官方库 sync.map 没出来前,Go maps in action 推荐的做法是使用 map+RWLock,比如定义一个匿名 struct 变量,其包含 map、RWLock,如下所示
var counter = struct{
sync.RWMutex
m map[string]int
}{m: make(map[string]int)}
复制代码
可以这样从 counter 中读数据
counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)
复制代码
可以这样往 counter 中写数据
counter.Lock()
counter.m["some_key"]++
counter.Unlock()
复制代码
那 Go 1.9 版本实现的 sync.map 和上面的这种实现方式有什么不同?它适用于哪些场景呢?它在哪些方面做了性能优化呢?
sync.map
sync.map 是用读写分离实现的,其思想是空间换时间。和 map+RWLock 的实现方式相比,它做了一些优化:可以无锁访问 read map,而且会优先操作 read map,倘若只操作 read map 就可以满足要求(增删改查遍历),那就不用去操作 write map(它的读写都要加锁),所以在某些特定场景中它发生锁竞争的频率会远远小于 map+RWLock 的实现方式。
接下来着重介绍下sync.map的源码,以了解其运作原理
1. 变量介绍
1.1 结构体 Map
type Map struct {
// 互斥锁mu,操作dirty需先获取mu
mu Mutex
// read是只读的数据结构,访问它无须加锁,sync.map的所有操作都优先读read
// read中存储结构体readOnly,readOnly中存着真实数据---entry(详见1.3),read是dirty的子集
// read中可能会存在脏数据:即entry被标记为已删除(详见1.3)
read atomic.Value // readOnly
// dirty是可以同时读写的数据结构,访问它要加锁,新添加的key都会先放到dirty中
// dirty == nil的情况:1.被初始化 2.提升为read后,但它不能一直为nil,否则read和dirty会数据不一致。
// 当有新key来时,会用read中的数据 (不是read中的全部数据,而是未被标记为已删除的数据,详见3.2)填充dirty
// dirty != nil时它存着sync.map的全部数据(包括read中未被标记为已删除的数据和新来的数据)
dirty map[interface{}]*entry
// 统计访问read没有未命中然后穿透访问dirty的次数
// 若miss等于dirty的长度,dirty会提升成read,提升后可以增加read的命中率,减少加锁访问dirty的次数
misses int
}
复制代码
1.2 结构体 readOnly
type readOnly struct {
m map[interface{}]*entry
amended bool
}
复制代码
1.1 的结构 read 存的就是 readOnly,m 是一个 map,key 是 interface,value 是指针 entry,其指向真实数据的地址,amended 等于 true 代表 dirty 中有 readOnly.m 中不存在的 entry
1.3 结构体 entry
type entry struct {
// p == nil:entry已从readOnly中删除但存在于dirty中
// p == expunged:entry已从Map中删除且不在dirty中
// p == 其他值:entry为正常值
p unsafe.Pointer // *interface{}
}
复制代码
entry 中的指针 p 指向真正的 value 所在的地址,dirty 和 readOnly.m 存的值类型就是*entry。这里的 nil 和 expunged 有什么作用呢?只要 nil 不可以吗?对于这些问题后面会一一解读。
2. 函数介绍
下面介绍下 sync.Map 的四个方法:Store、Load、Delete、Range
2.1 Load 方法
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 从m.read中换出readOnly,然后从里面找key,这个过程不加锁
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// readOnly中不存在此key但Map.dirty可能存在
if !ok && read.amended {
// 加锁访问Map.dirty
m.mu.Lock()
// 双重检测:若加锁前Map.dirty被替换为readonly,则前面m.read.Load().(readOnly)无效,需
// 要再次检查
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// read.m没有此key && dirty里有可能有(dirty中有read.m没有的数据)
if !ok && read.amended {
// 从dirty中获取key对应的entry
e, ok = m.dirty[key]
// 无论Map.dirty中是否有这个key,miss都加一,若miss大小等于dirty的长度,dirty中的元素会被
// 加到Map.read中
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// 若entry.p被删除(等于nil或expunged)返回nil和不存在(false),否则返回对应的值和存在(true)
return e.load()
}
复制代码
**Map.dirty 是如何提升为 Map.read 的呢?让我们来看下 missLocked 方法
**
func (m *Map) missLocked() {
// 访问一次Map.dirty,misses就要加一
m.misses++
if m.misses < len(m.dirty) {
return
}
// 当misses等于dirty的长度,m.dirty提升为readOnly,amended被默认赋值成false
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
复制代码
小结:
Load 方法会优先无锁访问 readOnly,未命中后如果 Map.dirty 中可能存在这个数据就会加锁访问 Map.dirty
Load 方法如果访问 readOnly 中不存在但 dirty 中存在的 key,就要加锁访问 Map.dirty 从而带来额外开销。
2.2 Store 方法
func (m *Map) Store(key, value interface{}) {
// 把m.read转成结构体readOnly
read, _ := m.read.Load().(readOnly)
// 若key在readOnly.m中且entry.p不为expunged(没有标记成已删除)即key同时存在于readOnly.m和dirty
// ,用CAS技术更新value 【注】:e.tryStore在entry.p == expunged时会立刻返回false,否则用CAS
// 尝试更新对应的value, 更新成功会返回true
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// key不存在于readOnly.m或者entry.p==expunged(entry被标记为已删除),加锁访问dirty
m.mu.Lock()
// 双重检测:若加锁前Map.dirty被提升为readOnly,则前面的read.m[key]可能无效,所以需要再次检测key是
// 否存在于readOnly中
read, _ = m.read.Load().(readOnly)
// 若key在于readOnly.m中
if e, ok := read.m[key]; ok {
// entry.p之前的状态是expunged,把它置为nil
if e.unexpungeLocked() {
// 之前dirty中没有此key,所以往dirty中添加此key
m.dirty[key] = e
}
// 更新(把value的地址原子赋值给指针entry.p)
e.storeLocked(&value)
// 若key在dirty中
} else if e, ok := m.dirty[key]; ok {
// 更新(把value的地址原子赋值给指针entry.p)
e.storeLocked(&value)
// 来了个新key
} else {
// dirty中没有新数据,往dirty中添加第一个新key
if !read.amended {
// 把readOnly中未标记为删除的数据拷贝到dirty中
m.dirtyLocked()
// amended:true,因为现在dirty有readOnly中没有的key
m.read.Store(readOnly{m: read.m, amended: true})
}
// 把这个新的entry加到dirty中
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
复制代码
tryStore 函数如下:
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
复制代码
unexpungeLocked 函数如下:
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
复制代码
dirtyLocked 函数如下:
func (m *Map) dirtyLocked() {
// 只要调用dirtyLocked,此时dirty肯定等于nil
if m.dirty != nil {
return
}
// dirty为nil时,把readOnly中没被标记成删除的entry添加到dirty
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// tryExpungeLocked函数在entry未被删除时【e.p!=expunged&&e.p!=nil】返回false,在
// e.p==nil时会将其置为expunged并返回true
if !e.tryExpungeLocked() {
m.dirty[k] = e // entry没被删除,把它添加到dirty中
}
}
}
复制代码
小结:
Store 方法优先无锁访问 readOnly,未命中会加锁访问 dirty
Store 方法中的双重检测机制在下面的 Load、Delete、Range 方法中都会用到,原因是:加锁前 Map.dirty 可能已被提升为 Map.read,所以加锁后还要再次检查 key 是否存在于 Map.read 中
dirtyLocked 方法在 dirty 为 nil(刚被提升成 readOnly 或者 Map 初始化时)会从 readOnly 中拷贝数据,如果 readOnly 中数据量很大,可能偶尔会出现性能抖动。
sync.map 不适合用于频繁插入新 key-value 的场景,因为此操作会频繁加锁访问 dirty 会导致性能下降。更新操作在 key 存在于 readOnly 中且值没有被标记为删除(expunged)的场景下会用无锁操作 CAS 进行性能优化,否则也会加锁访问 dirty
2.3 Delete 方法
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
// 从m.read中换出readOnly,然后从里面找key,此过程不加锁
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// readOnly不存在此key,但dirty中可能存在
if !ok && read.amended {
// 加锁访问dirty
m.mu.Lock()
// 双重检测:若加锁前Map.dirty被替换为readonly,则前面m.read.Load().(readOnly)无
// 效,需要再次检查
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// readOnly不存在此key,但是dirty中可能存在
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
// 如果entry.p不为nil或者expunged,则把entry.p软删除(标记为nil)
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
// e.p是真实值,把它置为nil
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
小结:
复制代码
删除 readOnly 中存在的 key,可以不用加锁
如果删除 readOnly 中不存在的或者 Map 中不存在的 key,都需要加锁。
2.4 Range 方法
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
// dirty存在readOnly中不存在的元素
if read.amended {
// 加锁访问dirty
m.mu.Lock()
// 再次检测read.amended,因为加锁前它可能已由true变成false
read, _ = m.read.Load().(readOnly)
if read.amended {
// readOnly.amended被默认赋值成false
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// 遍历readOnly.m
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
复制代码
小结:
Range 方法 Map 的全部 key 都存在于 readOnly 中时,是无锁遍历的,性能最高
Range 方法在 readOnly 只存在 Map 中的部分 key 时,会一次性加锁拷贝 dirty 的元素到 readOnly,减少多次加锁访问 dirty 中的数据
3. sync.map 总结
3.1 使用场景
sync.Map 更适合读多更新多而插入新值少的场景(appendOnly 模式,尤其是 key 存一次,多次读而且不删除的情况),因为在 key 存在的情况下读写删操作可以不用加锁直接访问 readOnly 不适合反复插入与读取新值的场景,因为这种场景会频繁操作 dirty,需要频繁加锁和更新 read【此场景 github 开源库orcaman/concurrent-map更合适】
3.2 设计点:expunged
entry.p 取值有 3 种,nil、expunged 和指向真实值。那 expunged 出现在什么时候呢?为什么要有 expunged 的设计呢?它有什么作用呢?
什么时候 expunged 会出现呢?
当用 Store 方法插入新 key 时,会加锁访问 dirty,并把 readOnly 中的未被标记为删除的所有 entry 指针复制到 dirty,此时之前被 Delete 方法标记为软删除的 entry(entry.p 被置为 nil)都变为 expunged,那这些被标记为 expunged 的 entry 将不会出现在 dirty 中。
反向思维,如果没有 expunged,只有 nil 会出现什么结果呢?
直接删掉 entry==nil 的元素,而不是置为 expunged:在用 Store 方法插入新 key 时,readOnly 数据拷贝到 dirty 时直接把为 nil 的 entry 删掉。但这要对 readOnly 加锁,sync.map 设计理念是读写分离,所以访问 readOnly 不能加锁。
不删除 entry==nil 的元素,全部拷贝:在用 Store 方法插入新 key 时,readOnly 中 entry.p 为 nil 的数据全部拷贝到 dirty 中。那么在 dirty 提升为 readOnly 后这些已被删除的脏数据仍会保留,也就是说它们会永远得不到清除,占用的内存会越来越大。
不拷贝 entry.p==nil 的元素:在用 Store 方法插入新 key 时,不把 readOnly 中 entry.p 为 nil 的数据拷贝到 dirty 中,那在用 Store 更新值时,就会出现 readOnly 和 dirty 不同步的状态,即 readOnly 中存在 dirty 中不存在的 key,那 dirty 提升为 readOnly 时会出现数据丢失的问题。
4. sync.map 的其他问题
为什么 sync.map 不实现 len 方法?
个人觉得还是成本和收益的权衡。
实现 len 方法要统计 readOnly 和 dirty 的数据量,势必会引入锁竞争,导致性能下降,还会额外增加代码实现复杂度
对 sync.map 的并发操作导致其数据量可能变化很快,len 方法的统计结果参考价值不大。
orcanman/concurrent-map
orcaman/concurrent-map的适用场景是:**反复插入与读取新值,**其实现思路是:对 go 原生 map 进行分片加锁,降低锁粒度,从而达到最少的锁等待时间(锁冲突)
它的实现比较简单,截取部分源码如下
1. 数据结构
// SHARD_COUNT 分片大小
var SHARD_COUNT = 32
type ConcurrentMap []*ConcurrentMapShared
// ConcurrentMapShared 分片的并发map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // 访问内部map都需要先获取读写锁
}
// New 创建一个concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
复制代码
2. 函数介绍
2.1 GET 方法
// 先hash拿到key对应的分区号,然后加锁,读取值,最后释放锁和返回
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
复制代码
2.2 SET 方法
// 先hash拿到key对应的分区号,然后加锁,设置新值,最后释放锁
func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
复制代码
2.3 Remove 方法
// 先hash拿到key对应的分区号,然后加锁,删除key,最后释放锁
func (m ConcurrentMap) Remove(key string) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
复制代码
2.4 Count 方法
// 分别拿到每个分片map中的元素数量,然后汇总后返回
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
复制代码
2.5 Upsert 方法
// 先hash拿到key对应的分区号,然后加锁,如果key存在就更新其value,否则插入新的k-v,释放锁并返回
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
复制代码
后续
当然在其他业务场景中,我们可能更需要的是本地 kv 缓存组件库并要求它们支持键过期时间设置、淘汰策略、存储优化、gc 优化等。 这时候可能我们就需要去了解 freecache、gocache、fastcache、bigcache、groupcache 等组件库了。
参考链接
https://stackoverflow.com/questions/45585589/golang-fatal-error-concurrent-map-read-and-map-write/45585833
https://github.com/golang/go/issues/20680
https://github.com/golang/go/blob/master/src/sync/map.go
https://github.com/orcaman/concurrent-map
感兴趣的同学可以关注我的微信公号:小梁编程汇,后续更多干货会在 infoQ 和公众号持续更新。
评论