本文首发于公众号:Hunter 后端
原文链接:Golang基础笔记十五之sync
这一篇笔记介绍 Golang 中的 sync 模块。
sync 包主要提供了基础的同步原语,比如互斥锁,读写锁,等待组等,用于解决并发编程中的线程安全问题,以下是本篇笔记目录:
WaitGroup-等待组
sync.Mutex-互斥锁
sync.RWMutex-读写锁
sync.Once-一次性执行
sync.Pool-对象池
sync.Cond-条件变量
sync.Map
1、WaitGroup-等待组
前面在第十篇我们介绍 goroutine 和 channel 的时候,在使用 goroutine 的时候介绍有一段代码如下:
package main
import ( "fmt" "time")
func PrintGoroutineInfo() { fmt.Println("msg from goroutine")}
func main() { go PrintGoroutineInfo() time.Sleep(1 * time.Millisecond) fmt.Println("msg from main")}
复制代码
在这里,我们开启了一个协程调用 PrintGoroutineInfo() 函数,然后使用 time.Sleep() 来等待它调用结束。
然而在开发中,我们不能确定这个函数多久才能调用完毕,也无法使用准确的 sleep 时间来等待,那么这里就可以使用到 sync 模块的 WaitGroup 函数来等待一个或多个 goroutine 执行完毕。
下面是使用示例:
package main
import ( "fmt" "math/rand" "sync" "time")
func SleepRandSeconds(wg *sync.WaitGroup) { defer wg.Done() sleepSeconds := rand.Intn(3) fmt.Printf("sleep %d seconds\n", sleepSeconds) time.Sleep(time.Duration(sleepSeconds) * time.Second)}
func main() { var wg sync.WaitGroup
wg.Add(2) go SleepRandSeconds(&wg) go SleepRandSeconds(&wg)
wg.Wait() fmt.Println("函数执行完毕")}
复制代码
在这里,我们通过 var wg sync.WaitGroup 定义了一个等待组,并通过 wg.Add(2) 表示添加了需要等待的并发数,在并发中我们将 &wg 传入并通过 wg.Done() 减少需要等待的并发数。
在 wg.Done() 函数内部,使用 wg.Add(-1) 减少需要等待的并发数,在 main 函数中,使用 wg.Wait() 进入阻塞状态,当等待的并发都完成后,此函数就会返回,完成等待并接着往后执行。
2、sync.Mutex-互斥锁
1. 数据竞态与互斥锁
当多个 goroutine 并发访问同一个共享资源,且至少有一个访问是写操作时,就会发生数据竞态,造成的结果就是程序每次运行的结果表现可能会不一致。
比如下面的示例:
var balance int
func AddFunc() { balance += 1}func main() { for range 100 { go AddFunc() } time.Sleep(5 * time.Second) fmt.Println("balance is: ", balance)}
复制代码
多次执行上面的代码,最终输出的 balance 的值可能都不一致。
如果一个变量在多个 goroutine 同时访问时,不会出现比如数据不一致或程序崩溃的情况,那么我们就称其是并发安全的。
我们可以使用 go run -race main.go 的方式来检测数据竞态,执行检测后,会输出数据竞态的一些信息,比如发生在代码的多少行,一共发生了多少次数据竞态:
==================WARNING: DATA RACERead at 0x000003910df8 by goroutine 7: main.AddFunc() /../main.go:13 +0x24
Previous write at 0x000003910df8 by goroutine 6: main.AddFunc() /../main.go:13 +0x3c
Goroutine 7 (running) created at: main.main() /../main.go:18 +0x32
Goroutine 6 (finished) created at: main.main() /../main.go:18 +0x32 ==================balance is: 98Found 3 data race(s)exit status 66
复制代码
而要避免这种数据竞态的发生,我们可以限制在同一时间只能有一个 goroutine 访问同一个变量,这种方法称为互斥机制。
我们可以通过缓冲通道和 sync.Mutex 来实现这种互斥锁的操作。
2. 缓冲通道实现互斥锁
我们可以通过容量为 1 的缓冲通道来实现互斥锁的操作,保证同一时间只有一个 goroutine 访问同一个变量,下面是修改后的代码:
var sema = make(chan struct{}, 1)var balance int
func AddFunc() { sema <- struct{}{} balance += 1 <-sema}func main() { for range 100 { go AddFunc() }
time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance)}
复制代码
在上面这段代码里,我们定义了 sema 这个容量为 1 的通道,在每个 AddFunc() 并发中,对变量 balance 执行读写操作前,我们先往通道里写入了一条数据,这样其他并发在执行该函数时,由于也会先往通道里写入数据,而这个时候通道已经满了,所以会处于堵塞状态,这就相当于获取锁。
直到 balance 写操作完成,从通道里读取数据,通道为空,相当于释放锁,这个时候其他并发才可以往通道里写入数据重新拿到锁。
这样我们通过往通道里写入和读取数据保证了同一时间只有一个 goroutine 在对 balance 进行写操作,从而实现互斥锁的操作。
3. sync.Mutex
在 sync 包中,sync.Mutex 直接为我们实现了互斥锁的操作,它的操作如下:
var mutex sync.Mutex // 互斥锁的定义mutex.Lock() // 获取锁mutex.Unlock() // 释放锁
复制代码
那么使用 sync.Mutex 实现上面的逻辑,代码如下:
var mutex sync.Mutexvar balance int
func AddFunc() { mutex.Lock() defer mutex.Unlock() balance += 1}func main() { for range 100 { go AddFunc() }
time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance)}
复制代码
3、sync.RWMutex-读写锁
在上面介绍的 sync.Mutex 互斥锁中,限制了同一时间只能有一个 goroutine 访问某个变量,包括读和写,但这种情况并非是最理想的,比如在读多写少的场景下。
那么 Golang 里的 sync.RWMutex 为我们提供了读写锁的操作,它会允许多个读操作的并发,而写操作会阻塞所有读和写。
读写锁的基本规则如下:
当一个 goroutine 获取了读锁后,其他 goroutine 仍然可以获取读锁,但不能获取写锁。
当一个 goroutine 获取了写锁后,其他 goroutine 无论是读锁还是写锁都不能获取,必须等待该写锁释放。
当有写操作在等待时,避免写操作长期饥饿,会优先处理写锁请求。
下面是读写锁的用法:
var rwMu sync.RWMutex
rwMu.RLock() // 获取读锁rwMu.RUnlock() // 释放读锁
rwMu.Lock() // 获取写锁rwMu.Unlock() // 释放写锁
复制代码
下面是读写锁在函数中的用法示例:
func ReadBalance() { rwMu.RLock() fmt.Println("get read lock") defer rwMu.RUnlock() fmt.Println("balance: ", balance)}
func WriteBalance() { rwMu.Lock() fmt.Println("get write lock") defer rwMu.Unlock() balance += 1}
复制代码
4、sync.Once-一次性执行
sync.Once 可用于确保函数只被执行一次,常用于初始化操作,且可以用于延迟加载。
提供的方法是 Do(f func()),参数内容是一个需要被执行的函数 f,这个方法实现的功能是只有在第一次被调用的时候会执行 f 函数进行初始化。
下面是该方法的使用示例:
import ( "fmt" "sync")
type Config struct { // 配置信息}
var ( instance *Config once sync.Once)
func LoadConfig() { fmt.Println("初始化配置...") instance = &Config{} // 加载配置的逻辑}
func GetConfig() *Config { once.Do(LoadConfig) return instance}
func main() { c1 := GetConfig() c2 := GetConfig() fmt.Println(c1 == c2) // 输出: true(同一个实例)}
复制代码
在这里,虽然 GetConfig() 函数执行了两遍,但是其内部的调用的 LoadConfig 函数却只执行了一次,因为 sync.Once 会在内部记录该函数是否已经初始化。
sync.Once 是个结构体,其结构如下:
type Once struct { done atomic.Uint32 m Mutex}
复制代码
其中,done 字段用于记录需要执行的函数 f 是否已经被执行,其对应的 Do() 方法内部会先根据 done 字段判断,如果已经被执行过则直接返回,而如果没有则会先执行一次。
而 m 字段表示的互斥锁则用于在 Do() 方法内部调用的 doSlow() 中使用,用于确保并发情况下目标函数只被执行一次,在 f 函数执行结束后,done 参数会被置为 1,表示该函数已经被执行,这样再次调用 Do() 方法时,判断 done 字段的值为 1 则不会再执行此函数。
5、sync.Pool-对象池
sync.Pool,对象池,我们可以将一些生命周期短且创建成本高的对象存在其中,从而避免频繁的创建和销毁对象,以减少内存分配和垃圾回收压力。
简单地说就是复用对象。
1. 基础用法
下面以复用一个字节缓冲区为例介绍一下对象池的基础用法。
1) 创建对象池
创建对象池的操作如下:
var bufferPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} },}
复制代码
可以看到,这里对 sync.Pool 的 New 字段赋值了一个函数,返回的是一个字节缓冲区。
2) 从池中获取对象
从对象池中获取该对象的操作使用 Get() 操作:
buf := bufferPool.Get().(*bytes.Buffer)
复制代码
3) 将对象放回池中
对该字节缓冲区使用完毕后可以将该对象再放回池中:
2. 使用示例
import ( "bytes" "fmt" "sync")
var bufferPool = sync.Pool{ New: func() interface{} { fmt.Println("create bytes buffer") return &bytes.Buffer{} },}
func LogMessage(msg string) { buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf)
buf.Reset() buf.WriteString(msg) fmt.Println(buf.String())}
func main() { LogMessage("hello world") LogMessage("hello world") LogMessage("hello world")}
复制代码
在上面的代码中,我们先定义了 bufferPool,然后在 LogMessage() 函数中,先使用 Get() 获取该字节缓冲对象,因为这里返回的数据是接口类型,所以这里将其转为了对应的类型,然后使用 buf.Reset() 重置了之前的记录后写入新的数据,最后使用的 defer 操作将此对象又放回了对象池。
6、sync.Cond-条件变量
sync.Cond 用于等待特定条件发生后再继续执行,可用于生产者-消费者的模式。
创建一个条件变量,参数只有一个,那就是锁,下面代码里用的是互斥锁:
cond = sync.NewCond(&sync.Mutex{})
复制代码
返回的 cond 对外暴露的字段 L 就是我们输入的锁。
下面用一个生产者的代码示例来介绍 cond 的几个相关函数。
在这里定义了 queue 作为队列,其中拥有需要处理的数据,queueMaxSize 字段为限制的最大队列长度。
var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}))
func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) }}
复制代码
在定义的 Producer() 函数中,有一个死循环,内部先使用 cond.L.Lock() 获取锁,然后判断生产的数据是否有消费者消费,如果队列满了的话,则进入等待。
1. cond.Wait()
在上面的代码中,我们使用 cond.Wait() 进入了等待状态。
在 Wait() 函数内部,先对前面的锁进行释放操作,然后进入阻塞状态,直到其他 gouroutine 通过 Signal() 函数唤醒后重新获取锁。
2. cond.Signal()
前面往队列里添加数据后,通过 cond.Signal() 函数通知消费者,消费者在另一个函数中就可以被唤醒,然后进行处理,同时这个函数后面将锁释放 cond.L.Unlock()。
3. cond.Broadcast()
前面的 Signal() 函数是唤醒一个等待的 goroutine,cond.Broadcast() 函数则可以唤醒所有等待的 goroutine。
下面提供一下生产者-消费者的全部处理代码:
package main
import ( "fmt" "sync" "time")
var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}))
func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) }}
func Consumer() { for { cond.L.Lock() for len(queue) == 0 { fmt.Println("wait for produce") cond.Wait() // 等待并释放锁 } fmt.Println("consume queue") item := queue[0] queue = queue[1:] cond.Signal() // 通知生产者 cond.L.Unlock() ProcessItem(item) }}
func ProcessItem(i int) { fmt.Println("process i: ", i)}
func main() { go Producer() go Consumer() time.Sleep(1 * time.Second)}
复制代码
7、sync.Map
sync 模块提供了 sync.Map 用来存储键值对,但是和之前介绍的 map 不一样的是,sync.Map 是并发安全的,而且无需初始化,并且在操作方法上与原来的 map 不一样。
1. 并发安全
原生的 map 是非并发安全的,如果多个 goroutine 对其进行同时读写会触发错误,比如下面的操作:
import ( "fmt" "time")
var originMap = make(map[string]int)
func UpdateMapKey() { originMap["a"] += 1}
func GetMapKey() { a := originMap["a"] fmt.Println(a)}
func main() { originMap["a"] = 0 for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) fmt.Println("originMap: ", originMap)}
复制代码
但是 sync.Map 是并发安全的,内部会通过互斥锁的操作允许多个 goroutine 安全地读写,下面是使用 sync.Map 对上面逻辑的改写,后面我们会具体介绍其操作方法:
import ( "fmt" "sync" "time")
var originMap sync.Map
func UpdateMapKey() { for { oldValue, loaded := originMap.Load("a") if !loaded { if _, ok := originMap.LoadOrStore("a", 1); ok { return } } else { newValue := oldValue.(int) + 1 if originMap.CompareAndSwap("a", oldValue, newValue) { return } } }}
func GetMapKey() { a, _ := originMap.Load("a") fmt.Println(a)}
func main() { originMap.Store("a", 0) for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) a, _ := originMap.Load("a") fmt.Println("originMap: ", a)}
复制代码
2. 初始化
原生的 map 进行初始化,有下面两种操作方法:
var originMap = make(map[string]int)var originMap = map[string]int{}
复制代码
sync.Map 可以直接声明使用:
var m sync.Mapm.Store("a", 0)
复制代码
3. 操作方法
这里先介绍 sync.Map 增删改查的基础操作:
1) 增
增加一个 key 的操作如下:
2) 删
删除一个 key 的操作如下:
3) 改
修改操作还是可以用 Store() 方法,而且可以修改为不同数据类型:
4) 查
查询操作可以使用 Load() 方法,返回对应的 value 值以及是否存在:
m.Store("a", 1)v, ok := m.Load("a")if ok { fmt.Printf("exist value:%v\n", v.(int))} else { fmt.Printf("key not exists")}
复制代码
5) 遍历
遍历操作如下:
m.Store("a", 1)m.Range(func(key, value any) bool { fmt.Println(key, value) return true})
复制代码
4. 原子性条件操作
上面的这些方法可以实现基础的增删改查操作,但是如果我们有一个需求,比如前面的获取一个 key 的 value,然后在原值的基础上 +1 再存入,大概逻辑如下:
v, ok := m.Load(key)v = v.(int)v +=1m.Store(key, v)
复制代码
但是在这个操作中,如果有其他 goroutine 已经修改了 v 的值,那么我们这里的操作就相当于污染了源数据,而为了避免这个可能,我们可以使用一些原子性条件操作,以实现并发操作。
1) CompareAndSwap()
CompareAndSwap() 是一个更新操作,传入 3 个参数,key,oldValue 和 newValue,仅当 key 的结果为 oldValue 的时候,将结果更新为 newValue,使用示例如下:
key := "a"m.Store(key, 1)
swapped := m.CompareAndSwap(key, 1, 2)fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 2, 是否更新结果 %v\n", swapped)swapped = m.CompareAndSwap(key, 1, 3)fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 3, 是否更新结果 %v\n", swapped)
复制代码
所以在上面我们要对结果进行 +1 的代码操作为:
newValue := oldValue.(int) + 1if originMap.CompareAndSwap("a", oldValue, newValue) { return}
复制代码
2) CompareAndDelete()
CompareAndDelete 是一个原子性的删除操作,接受两个参数,key 和 oldValue,仅当 key 的值为 oldValue 时删除该 key,返回结果为是否删除:
key := "a"m.Store(key, 1)deleted := m.CompareAndDelete(key, 1)if deleted { fmt.Printf("当 key 的 value 为 %v 时,删除\n", 1)} else { fmt.Printf(" key 的 value 不为 %v 时,不执行删除\n", 1)}
复制代码
3) LoadAndDelete()
LoadAndDelete 表示是否加载某个 key 的值并删除该 key,无论该 key 是否存在,参数为 key,返回值为 value 和是否存在该 key:
key := "a"m.Store(key, 1)
value, loaded := m.LoadAndDelete(key)fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value)value, loaded = m.LoadAndDelete(key)fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value)
复制代码
4) LoadOrStore()
LoadOrStore 方法为不存在 key 则存入,存在的话则返回该值:
key := "a"
value, loaded := m.LoadOrStore(key, 1)fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)value, loaded = m.LoadOrStore(key, 1)fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
复制代码
评论