写点什么

Go 并发模式:管道和取消(译)

作者:en
  • 2022 年 2 月 20 日
  • 本文字数:8571 字

    阅读完需:约 28 分钟

Go 并发模式:管道和取消(译)

前言

多看外国好的文章也是对自己知识面的一个扩充,自己尝试翻译了一下 go 语言介绍 pipeline 的本篇文章,看完以后理解了如果通过管道优雅的实现多 goroutine 的关闭,希望我的翻译能让大家也弄明白如何做,有翻译的不好的地方,欢迎指正。

介绍

Go 的并发原语使构建流数据管道变得容易,从而有效地利用 I/O 和多个 CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净地处理故障的技术。

什么是 pipeline(管道)?

Go 语言并没有关于 pipeline 的正式定义;可以将 pipeline 理解为高并发的一种程序设计模式。通俗地说,pipeline 是由 channel 连接的一系列程序段落,每个段落是运行着相同的功能 goroutine。在 goroutine 的每个阶段,函数执行的逻辑如下

  • 通过上游的 channel 接收数据

  • 对数据执行功能,通常会由此产生新的值

  • 把心的数据从下游的 channel 传输出去

每个程序段落都有任意的输入和输出 channel,除了第一阶段(只有输入)和最后阶段(只有输出);第一阶段也称为生产者或者源,最后阶段称为接收器或者消费者。

接下来我们将从一个简单的 pipeline 例子来解释这些想法和技术,稍后我们还会展示一个更为生产化的例子。

平方数

考虑一个具有三个阶段的 pipeline。

1.gen,gen 函数启动一个 goroutine,在 channel 上发送整数,并在发送完所有值后关闭 channel

func gen(nums ...int) <-chan int {    out := make(chan int)    go func() {        for _, n := range nums {            out <- n        }        close(out)    }()    return out}
复制代码

2.sq,sq 函数从 channel 接收证书,然后返回每一个收到整数的平方数,当上游 channel(in)关闭,且 sq 已经讲接收到的所有整数的平方根发送到下游 channel 后他会关闭下游的 channel(out)。

func sq(in <-chan int) <-chan int {    out := make(chan int)    go func() {        for n := range in {            out <- n * n        }        close(out)    }()    return out}
复制代码

3.main 函数设置 pipeline 并运行最后阶段--〉从 sq 接收值并打印,直到管道关闭

func main() {    // Set up the pipeline.    c := gen(2, 3)    out := sq(c)
// Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9}
复制代码

由于 sq 有相同的上游 channel 和下游 channel,我们还可以进行一系列的组合

func main() {    // Set up the pipeline and consume the output.    for n := range sq(sq(gen(2, 3))) {        fmt.Println(n) // 16 then 81    }}
复制代码

Fan-out, fan-in

Fan-out:多个函数可以从同一个 channel 读取数据,直到 channel 关闭;这提供了一种在一组工作人员之间分配工作以并行化 cpu 使用和 I/O 的方法。

Fan-in:一个函数可以通过多路复用的方式从多个输入的 channel 读取信息并继续执行,直到所有的输入 channel 关闭。

了解了 fan 的概念后,我们可以更改 pipeline 来运行 sq 的两个实例,每个实例从同一个输入读取(fan-out),我们引入一个新功能合并,以扇入(fan-in)结果。


func main() {    in := gen(2, 3)    // Distribute the sq work across two goroutines that both read from in.    c1 := sq(in)    c2 := sq(in)    // Consume the merged output from c1 and c2.    for n := range merge(c1, c2) {        fmt.Println(n) // 4 then 9, or 9 then 4    }}
复制代码

merge 函数为每个输入 channel 启动一个 goroutine,将值复制到唯一的输出通道,将通道列表转换为单个通道。一旦所有的输出 goroutine 都启动了,merge 会再启动一个 goroutine 用于在所有输入 channel 关闭的时候去关闭输出 channel。由于向关闭的 channel 发送值会导致 panic,所以确认所有的发送操作在 channel 关闭前都结束了十分重要。sync.WaitGroup 类型提供了一种安排此同步的简单方法:

func merge(cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) }
// Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out}
复制代码

Stopping short(止损)

我们的 pipeleine 函数模式如下:

  • 当所有的发送操作完成后,程序阶段会关闭他们的输出 channel。

  • 程序阶段会不断的从输入 channel 接收数据,直到所有的本阶段 channel 都关闭。

这种模式允许每个接收阶段编写为 range 循环毛事,并缺确保一旦所有的值都下发到下游以后所有的的 goroutine 都会退出。

但是在实际的 pipeline 里面,阶段并不总是要去接收所有的上游值。有时候这是设计使然;接收者可能只需要一系列值的子集来取得进展;更经常地,一个阶段提前退出了,因为进入此阶段的值表示更早一阶段的错误。在这些例子中接受者都不比等待剩余值的到来,并且我们希望早期阶段停止生成后期阶段不需要的值。

在我们上面例子的 pipeline 中,如果一个阶段没有能消耗所有的入站值,尝试发送这些值的 goroutine 将无限制的阻塞。

    // Consume the first value from the output.    out := merge(c1, c2)    fmt.Println(<-out) // 4 or 9    return    // Since we didn't receive the second value from out,    // one of the output goroutines is hung attempting to send it.}
复制代码

这种情况下将导致资源泄漏;goroutine 消耗内存和 runtime 资源,goroutine 堆栈中的堆引用防止数据被垃圾收集,goroutine 不会被 gc,所以他们必须自己退出。

由此,哪怕下游阶段无法接收所有的入站值,我们也需要安排 channel 的上游阶段退出。一种方法是将出站 channel 更改为具有缓冲区的管道,一个缓冲区可以保存固定数量的值;如果缓冲区有空间则发送操作立即完成

c := make(chan int, 2) // buffer size 2c <- 1  // succeeds immediatelyc <- 2  // succeeds immediatelyc <- 3  // blocks until another goroutine does <-c and receives 1
复制代码

当创建 channel 的时候能够获取要发送值的数目时,使用缓冲区 channel 可以帮助我们简化代码。例如,我们可以重写 gen 以将整数列表复制到缓冲管道中,以避免创建新的 goroutine:

func gen(nums ...int) <-chan int {    out := make(chan int, len(nums))    for _, n := range nums {        out <- n    }    close(out)    return out}
复制代码

回到我们 pipeline 中被阻塞的 goroutine,我们可以考虑在 merge 返回的输出 channel 中添加一个缓冲区:

func merge(cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int, 1) // enough space for the unread inputs    // ... the rest is unchanged ...
复制代码

虽然这修复了例子中被阻塞的 goroutine,但这是错误的代码。这里选择缓冲 1 的原因是我们知道例子中 merge 接收的数量和下游将消耗的数量,若是我们再向 gen 多传递一个值,依旧会再次阻塞 goroutine。

因此,我们需要为下游阶段提供一种方法来向发送者表明下游将停止接受输入。

Explicit cancellation(显式取消)

当 main 决定在没有接收到所有值的情况下退出时,它必须告诉上游阶段的 goroutine 放弃他们试图发送的值。它通过一个名为 done 的通道上发送值来实现,它发送两个值,因为可能有两个被阻止的发件人:

func main() {    in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in)
// Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{}}
复制代码

发送 goroutine 用一个 select 替换它们的发送操作,该语句在发送输出发生时或当它们从 done 接收到值的时候继续。done 的值类型是空接口,因为值无关紧要,它的主要目的是指示应当放弃发送的接收事件。输出 goroutine 继续在其入站通道 c 上循环,因此上游阶段不会被阻塞。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
复制代码

这个方法有一个问题:下游每个接收者需要知道可能被阻塞的上游发送者数目,并安排在提前返回的时候向发送者发出信号,跟踪这些计数既繁琐又容易出错。

我们需要一种方法来告诉未知且无限数量的 goroutine 停止向下游发送它们的值。 在 Go 中,我们可以通过关闭 channel 来做到这一点,因为关闭 channel 上的接收操作总是可以立即进行,从而产生元素类型的零值。

这意味着 main 可以通过关闭 done channel 来解除所有发送者的阻塞。 这种关闭实际上是对发送者的广播信号。 我们扩展了我们的每个 channel 函数以接受 done 作为参数,并通过 defer 语句安排关闭发生,因此来自 main 的所有返回路径都会发出 channel 阶段退出的信号。

func main() {    // Set up a done channel that's shared by the whole pipeline,    // and close that channel when this pipeline exits, as a signal    // for all the goroutines we started to exit.    done := make(chan struct{})    defer close(done)          
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in)
// Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call. }
复制代码

现在,一旦完成关闭,我们的每个 pipeline 阶段都可以自由返回。 合并中的输出例程可以在不耗尽其入站 channel 的情况下返回,因为它知道上游发送方 sq 将在 done 关闭时停止尝试发送。 output 确保通过 defer 语句在所有返回路径上调用 wg.Done:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
复制代码

同样, sq 可以在 done 关闭后立即返回。 sq 通过 defer 语句确保其输出 channel 在所有返回路径上都关闭:

func sq(done <-chan struct{}, in <-chan int) <-chan int {    out := make(chan int)    go func() {        defer close(out)        for n := range in {            select {            case out <- n * n:            case <-done:                return            }        }    }()    return out}
复制代码

以下是 pipeline 建设的指导方针:

  • 当所有的发送操作结束,pipeline 的阶段将关闭他们的输出 channel。

  • 阶段会不断的从输入 channel 接收值,直到这些管道关闭或发送者被接触阻塞。

  • pipeline 通过确保所有发送的值有足够的缓冲区或在接收者可能放弃 channel 时显式地向发送者发出信号来解除对发送者的阻塞。

Digesting a tree

让我们将 pipeline 和实际场景结合。

MD5 是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印文件列表的摘要值。

% md5sum *.god47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码

我们的示例程序与 md5sum 类似,但将单个目录作为参数,并打印该目录下每个常规文件的摘要值,按路径名排序。

% go run serial.go .d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码

我们程序的主函数调用一个辅助函数 MD5All,它返回一个从路径名到摘要值的映射,然后对结果进行排序并打印:

func main() {    // Calculate the MD5 sum of all files under the specified directory,    // then print the results sorted by path name.    m, err := MD5All(os.Args[1])    if err != nil {        fmt.Println(err)        return    }    var paths []string    for path := range m {        paths = append(paths, path)    }    sort.Strings(paths)    for _, path := range paths {        fmt.Printf("%x  %s\n", m[path], path)    }}
复制代码

MD5All 函数是我们讨论的重点。 在 serial.go 中,实现不使用并发,并且在遍历树时简单地读取和汇总每个文件。

// MD5All reads all the files in the file tree rooted at root and returns a map// from file path to the MD5 sum of the file's contents.  If the directory walk// fails or any read operation fails, MD5All returns an error.func MD5All(root string) (map[string][md5.Size]byte, error) {    m := make(map[string][md5.Size]byte)    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {        if err != nil {            return err        }        if !info.Mode().IsRegular() {            return nil        }        data, err := ioutil.ReadFile(path)        if err != nil {            return err        }        m[path] = md5.Sum(data)        return nil    })    if err != nil {        return nil, err    }    return m, nil}
复制代码

Parallel digestion(并行改造)

在 parallel.go 中,我们将 MD5All 拆分为两阶段 pipeline。 第一阶段 sumFiles 遍历树,在一个新的 goroutine 中消化每个文件,并将结果发送到值类型为 result 的 channel 上:

type result struct {    path string    sum  [md5.Size]byte    err  error}
复制代码

sumFiles 返回两个 channel:一个用于结果,另一个用于 filepath.Walk 返回的错误。 walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查是否完成。 如果 done 已关闭,则文件 walk 立即停止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {    // For each regular file, start a goroutine that sums the file and sends    // the result on c.  Send the result of the walk on errc.    c := make(chan result)    errc := make(chan error, 1)    go func() {        var wg sync.WaitGroup        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {            if err != nil {                return err            }            if !info.Mode().IsRegular() {                return nil            }            wg.Add(1)            go func() {                data, err := ioutil.ReadFile(path)                select {                case c <- result{path, md5.Sum(data), err}:                case <-done:                }                wg.Done()            }()            // Abort the walk if done is closed.            select {            case <-done:                return errors.New("walk canceled")            default:                return nil            }        })        // Walk has returned, so all calls to wg.Add are done.  Start a        // goroutine to close c once all the sends are done.        go func() {            wg.Wait()            close(c)        }()        // No select needed here, since errc is buffered.        errc <- err    }()    return c, errc}
复制代码

MD5All 从 c 接收摘要值。 MD5All 在出错时提前返回,通过 defer 关闭 done:

func MD5All(root string) (map[string][md5.Size]byte, error) {    // MD5All closes the done channel when it returns; it may do so before    // receiving all the values from c and errc.    done := make(chan struct{})    defer close(done)          
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil}
复制代码

Bounded parallelism(有界并行)

parallel.go 中的 MD5All 实现为每个文件启动一个新的 goroutine。 在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。

我们可以通过限制并行读取的文件数量来限制这些分配。 在 bounded.go 中,我们通过创建固定数量的 goroutine 来读取文件来做到这一点。 我们的 pipeline 现在具有三个阶段:

  1. 遍历树

  2. 读取和消化文件

  3. 收集摘要

第一阶段 walkFiles 发出树中常规文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {    paths := make(chan string)    errc := make(chan error, 1)    go func() {        // Close the paths channel after Walk returns.        defer close(paths)        // No select needed for this send, since errc is buffered.        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {            if err != nil {                return err            }            if !info.Mode().IsRegular() {                return nil            }            select {            case paths <- path:            case <-done:                return errors.New("walk canceled")            }            return nil        })    }()    return paths, errc}
复制代码

中间阶段启动固定数量的消化器 goroutine,它们从路径接收文件名并在 channel c 上发送结果:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {    for path := range paths {        data, err := ioutil.ReadFile(path)        select {        case c <- result{path, md5.Sum(data), err}:        case <-done:            return        }    }}
复制代码

与我们之前的示例不同,digester 不会关闭其输出 channel,因为多个 goroutine 在共享 channel 上发送。 相反,MD5All 中的代码安排在所有 digester 完成后关闭 channel:

  // Start a fixed number of goroutines to read and digest files.    c := make(chan result)    var wg sync.WaitGroup    const numDigesters = 20    wg.Add(numDigesters)    for i := 0; i < numDigesters; i++ {        go func() {            digester(done, paths, c)            wg.Done()        }()    }    go func() {        wg.Wait()        close(c)    }()
复制代码

我们可以让每个 digester 创建并返回它自己的输出 channel,但是我们需要额外的 goroutine 来 fan-in 结果。

最后阶段接收来自 c 的所有结果,然后检查来自 errc 的错误。 此检查不能更早发生,因为在此之前,walkFiles 可能会阻止向下游发送值:

  m := make(map[string][md5.Size]byte)    for r := range c {        if r.err != nil {            return nil, r.err        }        m[r.path] = r.sum    }    // Check whether the Walk failed.    if err := <-errc; err != nil {        return nil, err    }    return m, ni
复制代码

总结

本文介绍了在 Go 中构建流数据 pipeline 的技术。 处理此类管道中的故障很棘手,因为 pipeline 中的每个阶段都可能阻止尝试向下游发送值,并且下游阶段可能不再关心传入的数据。 我们展示了关闭 channel 如何向 pipeline 启动的所有 goroutine 广播“done”信号,并定义了正确构建 pipeline 的准则。

翻译源文章

https://go.dev/blog/pipelines

发布于: 刚刚阅读数: 2
用户头像

en

关注

努力分享对他人有价值的知识 2018.06.14 加入

热爱技术,欢迎探讨

评论

发布
暂无评论
Go 并发模式:管道和取消(译)