写点什么

golang 中的几种并发模式

作者:六月的
  • 2022-10-19
    上海
  • 本文字数:2940 字

    阅读完需:约 1 分钟

0.1、索引

https://blog.waterflow.link/articles/1663551951058

1、for- select 模式

这种模式通常用在从多个通道读取数据


package main
import ( "fmt" "time")
func main() { ch1, ch2 := make(chan int), make(chan int)
// 每2秒不断往通道1写数据 go func() { i := 0 for { i += 2 ch1 <- i time.Sleep(2 * time.Second) } }()
// 每2秒不断往通道2写数据 go func() { i := 1 for { i += 2 ch2 <- i time.Sleep(2 * time.Second) } }()
// 不断从通道读数据 for { select { case v := <-ch1: fmt.Println("ch1:", v) time.Sleep(time.Second) case v := <-ch2: fmt.Println("ch2:", v) time.Sleep(time.Second) default: fmt.Println("default") time.Sleep(time.Second) } }
}
复制代码


如果 ch1 和 ch2 没数据,会走 default


如果 ch1 和 ch2 都有数据会随机选择一个执行,之所以随机是为了避免只执行第一个 case 导致饥饿

2、done-channel 模式

由于 goroutine 不会被垃圾回收,因此很可能导致内存泄漏。


为了避免内存泄漏,goroutine 应该有被触发取消的机制。父 Goroutine 需要通过一个名为 done 的只读通道向其子 Goroutine 发送取消信号。按照惯例,它被设置为第一个参数。


这种模式在其他模式中也被大量使用。


package main
import ( "fmt")
func main() { jobs := make(chan int, 5) done := make(chan bool)
go doWork(done, jobs)
for j := 1; j <= 3; j++ {
fmt.Println("sent job", j) jobs <- j } close(jobs) fmt.Println("sent all jobs")
// 任务结束 done <- true}
func doWork(done chan bool, jobs chan int) { for { select { case j, more := <-jobs: if more { fmt.Println("received job", j) } else { fmt.Println("received all jobs") } case <-done: // 任务结束,关闭子协程 return default: } }}
复制代码

3、or-done 模式

该模式旨在将多个完成通道组合成一个 agg_done;这意味着如果一个 done 通道发出信号,则整个 agg_done 通道也将关闭。然而,我们不知道在运行时完成通道的数量。


or-done 模式可以通过使用 goroutine 和 递归 来实现。


示例中 使上下递归函数像树一样相互依赖。上部将自己的 orDone 通道注入下部。然后下层也将自己的 orDone 返回给上层。


如果任何 orDone 通道关闭,则通知上层和下层。


这点和上面 done-channel 模式是不同的,上面是所有 goroutine 完成任务,这里是只要有 1 个 goroutine 完成就结束所有 goroutine。


就好比发送一个请求到多个微服务节点,只要有 1 个返回就算完成。


package main
import ( "fmt" "time")
func main() { var or func(channels ...<-chan interface{}) <-chan interface{} // 只要有1个结束阻塞,关闭orDone并返回 or = func(channels ...<-chan interface{}) <-chan interface{} { // 小于2个通道直接返回 switch len(channels) { case 0: return nil case 1: return channels[0] } // 声明一个orDone orDone := make(chan interface{}) go func() { // 完成关闭orDone defer close(orDone) switch len(channels) { case 2: // 如果是2个channel,只需要监听这两个 select { case <-channels[0]: case <-channels[1]: } default: // 二分法递归 m := len(channels) / 2 select { case <-or(channels[:m]...): case <-or(channels[m:]...): } } }() return orDone }
// 传入一个时间模拟请求时长,时间到了就close掉,结束当前channel的阻塞 sig := func(after time.Duration) <-chan interface{} { c := make(chan interface{}) go func() { defer close(c) time.Sleep(after) }() return c }
start := time.Now() // 这里orDone开始是阻塞的,里面开了5个channel <-or( sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), ) fmt.Printf("done after %v\n", time.Since(start))}
复制代码

4、fanout-channel 模式

意思是只有 1 个输入 channel,有多个输出 channel,经常用在设计模式中的观察者模式。观察者模式中,当数据发生变动后,多个观察者都会收到这个信号。


package main
import ( "fmt" "time")
func main() { // 输入的channel,相当于被观察者 ch := make(chan interface{}) go func() { for { ch <- time.Now() time.Sleep(3 * time.Second) } }()
// 观察者 out := make([]chan interface{}, 2) for k := range out { out[k] = make(chan interface{}) }
go fanout(ch, out)
// 是否观察到数据变化 for { select { case res := <-out[0]: fmt.Println(res) case res := <-out[1]: fmt.Println(res) } }}
func fanout(ch <-chan interface{}, out []chan interface{}) { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }()
// 订阅被观察者 for v := range ch { v := v for i := 0; i < len(out); i++ { i := i out[i] <- v } }
}
复制代码

5、fan-in-channel 模式

和上面的相反,这个是指多个源 channel 输入,一个目标 channel 输出的情况。


package main
import ( "fmt" "time")
func main() { // 输入的channel in := make([]chan interface{}, 2) in2 := make([]<-chan interface{}, 2)
for k := range in { k := k in[k] = make(chan interface{}) var inin <-chan interface{} = in[k] in2[k] = inin
go func() { for { in[k] <- time.Now() time.Sleep(3 * time.Second) } }()
}
// 打印输出的channel for v := range fanIn(in2...) { fmt.Println(v) }
}
func fanIn(chans ...<-chan interface{}) <-chan interface{} { switch len(chans) { case 0: c := make(chan interface{}) close(c) return c case 1: return chans[0] case 2: return mergeTwo(chans[0], chans[1]) default: // 多个channel二分法 m := len(chans) / 2 return mergeTwo(fanIn(chans[:m]...), fanIn(chans[m:]...)) }
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} { // 针对2个channel输出 c := make(chan interface{}) go func() { defer close(c) for a != nil || b != nil { select { case v, ok := <-a: if !ok { a = nil continue } c <- v case v, ok := <-b: if !ok { b = nil continue } c <- v }
} }() return c}
复制代码


用户头像

六月的

关注

还未添加个人签名 2019-07-23 加入

还未添加个人简介

评论

发布
暂无评论
golang中的几种并发模式_golang_六月的_InfoQ写作社区