Go 语言提供了多种开箱即用的并发原语,并在标准包的基础上形成了并发设计模式。本文将总结实践中常用的写法和技巧。旨在引导使用正确的并发编程范式。减少手撸代码带来的复杂度和漏洞。
并发与并行
Go 并发编程原语
1. Goroutine
Goroutine 是 Go 中的轻量级线程,用于并发执行代码。通过 go
关键字启动一个 Goroutine。
go func() {
// 并发执行的代码
}()
复制代码
2. Channel
Channels 是 Go 的一种类型,用于 Goroutine 之间的通信和同步。它们可以发送和接收特定类型的值。
无缓冲 Channel
ch := make(chan int)
go func() {
ch <- 1 // 发送数据
}()
value := <-ch // 接收数据
复制代码
缓冲 Channel
ch := make(chan int, 10)
ch <- 1 // 发送数据,不阻塞
value := <-ch // 接收数据
复制代码
3. Select
select
语句用于等待多个 Channel 操作。它可以选择可以操作的 Channel,或者阻塞等待其中一个变得可用。
select {
case msg := <-ch1:
fmt.Println("Received", msg)
case ch2 <- 1:
fmt.Println("Sent")
default:
fmt.Println("No communication")
}
复制代码
4. Mutex 和 RWMutex
互斥锁 (sync.Mutex
) 和读写锁 (sync.RWMutex
) 用于保护共享资源,防止数据竞争。
Mutex
var mu sync.Mutex
mu.Lock()
// 访问共享资源
mu.Unlock()
复制代码
RWMutex
var rwMu sync.RWMutex
rwMu.RLock()
// 读取共享资源
rwMu.RUnlock()
rwMu.Lock()
// 写入共享资源
rwMu.Unlock()
复制代码
5. WaitGroup
sync.WaitGroup
用于等待一组 Goroutine 完成。
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 并发执行的任务
}()
wg.Wait() // 等待所有 Goroutine 完成
复制代码
6. Once
sync.Once
用于确保某些操作只执行一次,例如单例模式的初始化。
var once sync.Once
once.Do(func() {
// 只执行一次的初始化代码
})
复制代码
7. Context
context
包提供了 Context
类型,用于控制 Goroutine 的生命周期和取消操作。
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
// 处理取消操作
}
}()
cancel() // 取消操作
复制代码
8. Ticker 和 Timer
time.Ticker
和 time.Timer
用于定时触发事件。
Ticker
ticker := time.NewTicker(1 * time.Second)
for t := range ticker.C {
fmt.Println("Tick at", t)
}
复制代码
Timer
timer := time.NewTimer(2 * time.Second)
<-timer.C
fmt.Println("Timer expired")
复制代码
9. ErrGroup
errgroup
包提供了一种机制来同时等待一组 Goroutine,并收集它们返回的错误。
import "golang.org/x/sync/errgroup"
var g errgroup.Group
g.Go(func() error {
return errors.New("something went wrong")
})
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
复制代码
并发设计模式
Go 语言中的扇入扇出(Fan-in/Fan-out)是并发编程中的一种常见模式,特别适用于需要将任务分发给多个工作 Goroutine,并将结果收集到一个地方进行处理的场景。以下是对这种并发设计范式的总结:
扇出(Fan-out)
扇出模式指的是将一个任务分解成多个子任务,并将这些子任务分发给多个 Goroutine 并行执行。
示例
func fanOut(jobs []Job) []Result {
var wg sync.WaitGroup
results := make([]Result, len(jobs))
jobsChan := make(chan int, len(jobs))
for i := range jobs {
jobsChan <- i
}
close(jobsChan)
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := range jobsChan {
results[i] = processJob(jobs[i])
}
}()
}
wg.Wait()
return results
}
复制代码
在这个例子中,fanOut
函数将任务分发给多个 Goroutine 进行并行处理。每个 Goroutine 从 jobsChan
通道中读取任务索引并处理任务。
扇入(Fan-in)
扇入模式指的是将多个 Goroutine 的输出合并到一个通道中,以便集中处理结果。
示例
func fanIn(resultsChan chan Result, done chan bool) []Result {
var results []Result
for {
select {
case result := <-resultsChan:
results = append(results, result)
case <-done:
return results
}
}
}
复制代码
在这个例子中,fanIn
函数从多个 Goroutine 的输出通道 resultsChan
中读取结果,并将结果集中存储到 results
切片中。当接收到 done
通道的信号时,函数返回所有收集到的结果。
扇入扇出(Fan-in/Fan-out)组合
结合扇入和扇出模式,可以构建高效的并发任务处理系统。例如,下述示例展示了如何同时使用扇入和扇出模式来并行处理任务并收集结果。
示例
func fanOutFanIn(jobs []Job) []Result {
var wg sync.WaitGroup
resultsChan := make(chan Result, len(jobs))
jobsChan := make(chan int, len(jobs))
done := make(chan bool)
for i := range jobs {
jobsChan <- i
}
close(jobsChan)
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := range jobsChan {
result := processJob(jobs[i])
resultsChan <- result
}
}()
}
go func() {
wg.Wait()
close(resultsChan)
done <- true
}()
return fanIn(resultsChan, done)
}
复制代码
在这个例子中,fanOutFanIn
函数首先将任务分发给多个 Goroutine 进行并行处理(扇出),然后将处理结果通过 resultsChan
通道传递给 fanIn
函数进行集中处理(扇入)。
生产者-消费者(Producer-Consumer)
生产者生成数据,消费者处理数据,两者通过通道(channel)进行通信。
常用于需要将生产和消费分离的场景,例如日志记录、任务队列。
示例:
jobs := make(chan Job)
results := make(chan Result)
// Producer
go func() {
defer close(jobs)
for i := 0; i < numJobs; i++ {
jobs <- Job{ID: i}
}
}()
// Consumer
go func() {
for job := range jobs {
results <- process(job)
}
close(results)
}()
复制代码
工作池(Worker Pool)
将任务分配给固定数量的工作 Goroutine 处理,每个工作 Goroutine 负责执行任务并将结果返回。
限制同时执行的 Goroutine 数量,防止资源耗尽。
jobs := make(chan Job, 100)
results := make(chan Result, 100)
for w := 0; w < numWorkers; w++ {
go func() {
for job := range jobs {
results <- process(job)
}
}()
}
for _, job := range jobList {
jobs <- job
}
close(jobs)
for i := 0; i < len(jobList); i++ {
result := <-results
fmt.Println(result)
}
复制代码
超时控制(Timeout/Deadline)
在一定时间内完成任务,如果超过时间限制则取消任务。
防止长时间阻塞或等待的情况。
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Timeout exceeded:", ctx.Err())
}
复制代码
4. 信号量(Semaphore)
sem := semaphore.NewWeighted(10) // 最大并发数为10
for i := 0; i < 100; i++ {
if err := sem.Acquire(context.Background(), 1); err != nil {
log.Fatal(err)
}
go func() {
defer sem.Release(1)
// 处理任务
}()
}
复制代码
通道扇出(Channel Fan-out)
将一个通道中的数据分发到多个通道中,每个通道都可以独立处理数据。
实现任务分发和负载均衡。
input := make(chan int)
outputs := []chan int{make(chan int), make(chan int)}
go func() {
for x := range input {
for _, out := range outputs {
out <- x
}
}
for _, out := range outputs {
close(out)
}
}()
// 发送数据到 input 通道
go func() {
for i := 0; i < 10; i++ {
input <- i
}
close(input)
}()
复制代码
这些并发设计范式在实际应用中非常常见,通过结合使用它们,可以有效地解决并发问题,提高程序的性能和可靠性。
除了前面提到的并发设计模式外,还有其他一些常见的并发设计模式,它们在不同场景中具有独特的用途:
批处理(Batch Processing)
将多个任务或数据聚合在一起,批量处理以提高效率。
用于减少频繁的 IO 操作或网络请求,通过批量处理来提高吞吐量。
var batch []Item
var mu sync.Mutex
go func() {
for {
time.Sleep(time.Second)
mu.Lock()
if len(batch) > 0 {
processBatch(batch)
batch = nil
}
mu.Unlock()
}
}()
// 添加任务到批处理
func addItem(item Item) {
mu.Lock()
batch = append(batch, item)
mu.Unlock()
}
复制代码
事件驱动(Event-Driven)
基于事件的发生来触发相应的处理逻辑。
常用于 GUI 编程或事件流处理系统,如点击事件处理、消息队列系统。
events := make(chan Event)
// 事件监听器
go func() {
for event := range events {
handleEvent(event)
}
}()
// 触发事件
events <- Event{Type: "click", Payload: "button1"}
复制代码
消息传递(Message Passing)
通过消息传递在不同的 Goroutine 之间进行通信,而不是共享内存。
适用于需要在多个 Goroutine 之间传递数据或信号的场景,如任务分发。
messages := make(chan string)
// 发送者
go func() {
messages <- "hello"
}()
// 接收者
msg := <-messages
fmt.Println(msg)
复制代码
有限队列(Bounded Queue)
通过限制队列的长度来控制并发量,防止过多任务导致系统负载过高。
防止任务生产者产生过多任务而导致系统资源枯竭。
jobs := make(chan Job, 100) // 队列长度为100
for w := 0; w < numWorkers; w++ {
go func() {
for job := range jobs {
process(job)
}
}()
}
复制代码
上下文控制(Context Control)
使用context.Context
来控制多个 Goroutine 的生命周期,尤其是处理取消信号和超时。
用于需要在一定时间内完成任务或在特定条件下取消任务的场景。
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
fmt.Println("Task cancelled")
}
}()
// 取消任务
cancel()
复制代码
这些模式为不同的并发场景提供了解决方案,可以根据具体需求选择合适的模式进行应用。这些模式的组合使用也常见于复杂系统中,以满足多样化的并发需求。
评论