写点什么

实例详解在 Go 中构建流数据 pipeline

  • 2024-02-21
    广东
  • 本文字数:8015 字

    阅读完需:约 26 分钟

实例详解在Go中构建流数据pipeline

本文分享自华为云社区《Go并发范式 流水线和优雅退出 Pipeline 与 Cancellation》,作者:张俭。

介绍


Go 的并发原语可以轻松构建流数据管道,从而高效利用 I/O 和多个 CPU。 本文展示了此类 pipelines 的示例,强调了操作失败时出现的细微之处,并介绍了干净地处理失败的技术。

什么是 pipeline?


pipeline 在 Go 中并没有书面的定义,只是众多并发程序中的一种。非正式地,pipeline 由一系列 stage 组成。每个 stage 是运行着同一个 function 的协程组。在每个 stage,协程们


  • 通过 inbound channel 从上游获取数据

  • 在 data 上进行运算,通常会产生新的值

  • 通过 outbound channel 向下游发送数据


每个 Stage 都有数个 inbound channel 和 outbound channel,除了第一个和最后一个 Stage,分别只有 outbound 和 inbound channel。第一个 Stage 通常叫做SourceProducer。最后一个 Stage 通常叫做SinkConsumer


我们将从一个简单的示例 pipeline 开始来解释这些想法和技术。 稍后,我们将提供一个更实际的例子。

Squaring numbers 平方数


考虑一个有着三个阶段的流水线。


第一阶段,gen,是个将整数列表转换为一个发射列表中整数的 channel 的函数。gen函数启动一个 go routine,用来发送 channel 中的整数,然后当所有的整数都被发出后,将 channel 关闭:


func gen(nums ...int) <-chan int {	out := make(chan int)	go func() {		for _, n := range nums {			out <- n		}		close(out)	}()	return out}
复制代码


第二阶段,sq从上面的 channel 中接收数据,返回一个发射对应整数平方数的 channel。当 inbound channel 关闭后,并且这一阶段将所有的 value 发送到下游后,再将这个 outbound channel 关闭


func sq(in <-chan int) <-chan int {	out := make(chan int)	go func() {		for n := range in {			out <- n * n		}		close(out)	}()	return out}
复制代码


main 函数组织整个 pipeline,并且运行最终的 stage:从第二个 stage 中接收数据然后逐个打印,直到 channel 被关闭


func main() {	// Set up the pipeline	c := gen(2, 3)	out := sq(c)
// Consume the output // 4 fmt.Println(<-out) // 9 fmt.Println(<-out)}
复制代码


既然 sq 的 inbound channel 和 outbound channel 类型相同,我们可以将其进行任意数量的组合。我们还可以将 main 函数重写为循环,就像在其他 Stage 中做的那样一样。


func main() {    // Set up the pipeline and consume the output.    for n := range sq(sq(gen(2, 3))) {        fmt.Println(n) // 16 then 81    }}
复制代码

扇入和扇出


许多函数可以从一个 channel 中获取数据直到 channel 被关闭,这被叫做扇出。这提供了一种在 worker 之间分配工作以并行化 CPU 使用和 I/O 的方法。


一个函数可以通过将多个 input channel 多路复用到同一个 channel,当所有的 channel 关闭时,该多路复用 channel 才关闭。从而达到从多个 input 获取数据并处理,直到所有 input channel 都关闭才停止的效果。这叫做扇入。


我们可以将我们的流水线改为运行两个sq,每个都从相同的 channel 读取数据。我们引入一个新的函数merge,来做扇入的工作


func main() {	in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in)
// Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 }}
复制代码

merge函数通过对每个 channel 开启一个协程,把数据拷贝到另一个 out channel 中,实现将 channel 列表转换为一个 channel 的效果。当所有 send 操作完成后,再将 out channel 关闭。


向一个已经关闭上的 channel 发送数据会导致 panic,所以保证发送完所有再关闭 channel 至关重要。sync.WaitGroup 提供了一个简单地方式来编排这个同步


func merge(cs ...<-chan int) <-chan int {	var wg sync.WaitGroup	out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out}
复制代码

短暂的停顿


我们的 pipeline 函数有这样的模式:


  • 当发送任务结束后,关闭发送 output channel

  • 直到 input channel 关闭前,一直从 input channel 中接收消息


这个模式下,每个阶段都可以用协程+for 循环的模式来书写,保证每个数据发送到下游后再关闭所有协程。


但是在实际的 pipeline 中,阶段并不总是接收所有来自 inbound channel 的数据。通常,如果 inbound 的值出现了错误,pipeline 会提前退出。 在任何一种情况下,接收者都不必等待剩余值到达,并且我们希望 fast fail(较早阶段的 Stage 尽早停止后期 Stage 不需要的值)。


在我们的示例 pipeline 中,如果一个 Stage 未能消费所有 inbound 值,则尝试计算后并发送这些值的 goroutine 将无限期阻塞:


    // Consume the first value from the output.    out := merge(c1, c2)    fmt.Println(<-out) // 4 or 9    return    // Since we didn't receive the second value from out,    // one of the output goroutines is hung attempting to send it.}
复制代码


这就导致了资源泄漏:协程消耗内存、运行资源,并且在协程栈内的 golang 堆引用导致垃圾无法回收。协程只能自己退出,不能由垃圾回收机制回收。


即使下游的 Stage 无法接收所有 inbound value,我们也需要把上游的协程退出。如果把上游的协程改为有 buffer 的,可以解决上面的问题。如果 Buffer 中还有空间,则发送操作可以立刻完成


c := make(chan int, 2) // buffer size 2c <- 1  // succeeds immediatelyc <- 2  // succeeds immediatelyc <- 3  // blocks until another goroutine does <-c and receives 1
复制代码


当要发送的数目可以在 channel 创建时知道时,buffer 可以简化代码。举个例子,让我们来使用 buffer channel,不开辟新的协程来重写gen方法:


func gen(nums ...int) <-chan int {    out := make(chan int, len(nums))    for _, n := range nums {        out <- n    }    close(out)    return out}
复制代码


在我们的 pipeline 中,我们就需要在merge方法中使用的channel添加 buffer:


func merge(cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int, 1) // enough space for the unread inputs    // ... 其余的没有变更 ...
复制代码


尽管上面这个方案修复了阻塞的问题,但它是很差的方案。这里有一个对 1 的硬编码,这太脆弱了?你真的能预料到有多少个值不能被正常发送吗?一旦两个值不能正常发送,你的协程又阻塞了。


作为替代,我们需要给下游阶段提供一个机制,知会下游阶段,发送者已经停止发送了。

Explicity cancellation 显示取消


main函数决定不从 out 处接收所有数据,而是退出时,它必须知会上游阶段的协程放弃接下来的发送。它通过向一个名叫done的 channel 发送数据来完成这个动作。因为发送方有两个,所以 向done发送两次数据。


func main() {    in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in)
// Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{}}
复制代码


发送到 out channel 的发送者把原来的逻辑替换成一个 select 操作,select 或者发送一个数据,抑或从done处接收到数据。因为done中数据值的类型根本不重要,主要是接收到值这个事件本身很重要,所以done channel 的类型时struct {}output循环继续在inbound channel 上执行,所以上游的阶段并没有被阻塞。(我们稍后会讨论如何让循环迅速返回。)


func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
复制代码


这个方法有一个问题:每一个下游接收者都需要知道可能阻塞的上游发送者总数。维护它们的数目,是一个琐碎又容易出错的事情。


我们需要一个机制来让不可知的、无界的发送协程来停止发送到下游的值。在 Go,我们可以通过关闭 channel 来完成这件事,因为在已经关闭的 channel 上执行 receive 操作,会立刻返回该元素的零值。


这说明main函数可以简单地通过关闭done channel 来让所有的发送者不阻塞。关闭操作是一个高效的广播。我们把 pipeline 中的每个函数都接受done作为参数,并把done在 defer 语句中关闭, 这样,如果在main函数中返回,都会通知 pipeline 中的阶段退出。


func main() {    // Set up a done channel that's shared by the whole pipeline,    // and close that channel when this pipeline exits, as a signal    // for all the goroutines we started to exit.    done := make(chan struct{})    defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in)
// Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.}
复制代码


现在当donechannel 关闭后,接收到 close 信息的阶段,都可以直接退出了。merge函数中的outout协程可以不从inbound channel 中取数据直接退出,因为它知道,上游的发送 sq,接收到 close 信息,也会直接退出。output通过 defer 语句来保证wg.Done()一定被调用。(译者注:来关闭 out channel)


func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)
// Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
复制代码


相似的,当接收到 close 信号时,sq函数也可以立刻返回。sq通过defer语句来保证outchannel 一定被关闭。


这是给构建 pipeline 的一些指导:


  • 当所有的发送操作完成后,关闭 outbound channel

  • 如果发送发不阻塞,或是 channel 没有关闭,接收者会一直从 channel 中接收数据


Pipeline 通过如下两个方式来解除发送者的阻塞


  • 确保 channel 的 buffer 足够大

  • 显示知会发送者,接收者已经放弃接收

Digesting a tree 对树进行摘要


让我们来考虑一个更实际的 pipeline


MD5 是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印文件列表的摘要值。


% md5sum *.god47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码


我们的示例程序类似于 md5sum,但将单个目录作为参数并打印该目录下每个常规文件的摘要值,按路径名排序。


% go run serial.go .d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
复制代码


我们的主函数调MD5All这个辅助函数,返回路径名和摘要值的 map,main函数再将它们排序打印


func main() {	// Calculate the MD5 sum of all files under the specified directory,	// then print the results sorted by path name.	m, err := MD5All(os.Args[1])	if err != nil {		fmt.Println(err)		return	}	var paths []string	for path := range m {		paths = append(paths, path)	}	sort.Strings(paths)	for _, path := range paths {		fmt.Printf("%x  %s\n", m[path], path)	}}
复制代码


MD5All函数是我们讨论的重点。在如下串行化的实现中,没有使用并发技术,只是简单对文件进行了遍历


// MD5All reads all the files in the file tree rooted at root and returns a map// from file path to the MD5 sum of the file's contents.  If the directory walk// fails or any read operation fails, MD5All returns an error.func MD5All(root string) (map[string][md5.Size]byte, error) {    m := make(map[string][md5.Size]byte)    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {        if err != nil {            return err        }        if !info.Mode().IsRegular() {            return nil        }        data, err := ioutil.ReadFile(path)        if err != nil {            return err        }        m[path] = md5.Sum(data)        return nil    })    if err != nil {        return nil, err    }    return m, nil}
复制代码

并行计算摘要


在并行的解法中,我们将MD5All分割为两个阶段的 pipeline。第一个阶段,sumFiles,遍历文件树,针对每个文件,在新的协程中计算摘要,然后把结果发送到 channel 中,这是 result 的类型


type result struct {    path string    sum  [md5.Size]byte    err  error}
复制代码


sumFiles返回两个 channel:一个是 result channel,另一个是filepath.Walk中产生的错误。walk函数针对每个文件启动一个新的协程来处理,然后检查donechannel。如果done已经被关闭,walk函数会立刻停止:


func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {	// For each regular file, start a goroutine that sums the file and	// sends the result on c.	// Send the result of the walk on errc.	c := make(chan result)	errc := make(chan error, 1)	go func() {		var wg sync.WaitGroup		// If any error occurred, walk method will return		err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {			if err != nil {				return err			}			if !info.Mode().IsRegular() {				return nil			}			wg.Add(1)			go func() {				data, err := ioutil.ReadFile(path)				select {				case c <- result{					path: path,					sum:  md5.Sum(data),					err:  err,				}:				case <-done:				}				wg.Done()			}()			// Abort the walk if done is closed.			select {			case <-done:				return errors.New("walk canceled")			default:				return nil			}		})		// Walk has returned, so all calls to wg.Add are done.		// Start a goroutine to close c once all the sends are done.		// No select needed here, since errc is buffered.		errc <- err	}()	return c, errc}
复制代码


MD5Allc中接收到摘要数据。当发生错误时,MD5All会迅速返回,通过defer语句来关闭done channel


func MD5All(root string) (map[string][md5.Size]byte, error) {    // MD5All closes the done channel when it returns; it may do so before    // receiving all the values from c and errc.    done := make(chan struct{})    defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil}
复制代码

有界的并行


parallel.go 中的 MD5All 实现为每个文件启动一个新的 goroutine。 在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。


我们可以通过限制并行读取的文件数量来限制这些分配。 在新的解决方式中,我们通过创建固定数量的 goroutine 来读取文件来做到这一点。 我们的 pipeline 现在分为三个阶段:遍历树、读取并计算文件摘要以及收集摘要。


第一阶段 walkFiles 发射出文件树中常规文件的路径:


func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {	paths := make(chan string)	errc := make(chan error, 1)	go func() {		// Close the paths channel after Walk returns.		defer close(paths)		// No select needed for this send, since errc is buffered.		errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {			if err != nil {				return err			}			if !info.Mode().IsRegular() {				return nil			}			select {			case paths <- path:			case <-done:				return errors.New("walk canceled")			}			return nil		})	}()	return paths, errc}
复制代码


第二阶段启动固定数量的协程来计算文件摘要,然后发送到 c channel 中


func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {    for path := range paths {        data, err := ioutil.ReadFile(path)        select {        case c <- result{path, md5.Sum(data), err}:        case <-done:            return        }    }}
复制代码


和之前的示例不同,因为多个协程都在共享 channel 上发送数据,digester函数并没有关闭 output channel。作为替代,当所有的 digesters 跑完之后,MD5All会关闭 channel


    // Start a fixed number of goroutines to read and digest files.    c := make(chan result)    var wg sync.WaitGroup    const numDigesters = 20    wg.Add(numDigesters)    for i := 0; i < numDigesters; i++ {        go func() {            digester(done, paths, c)            wg.Done()        }()    }    go func() {        wg.Wait()        close(c)    }()
复制代码


这里也可以针对每个 digester 开启独立的 channel,不过到时候就要对 channel 进行扇入处理。


最终阶段从c中取得所有结果,并且检查 errc 中的错误。此检查不能更早发生,因为在此之前,walkFiles 可能会阻塞:


(译者注:要保证检查 errc 的错误,发生在 filePath.Walk 启动后,done不会再次发送了,协程就不会退出)


   m := make(map[string][md5.Size]byte)    for r := range c {        if r.err != nil {            return nil, r.err        }        m[r.path] = r.sum    }    // Check whether the Walk failed.    if err := <-errc; err != nil {        return nil, err    }    return m, nil}
复制代码

总结


本文介绍了在 Go 中构建流数据 pipeline 的技术。 处理此类 pipeline 中的故障很棘手,因为 pipeline 中的每个阶段可能会阻止尝试向下游发送值,并且下游阶段可能不再关心传入的数据。 我们展示了关闭通道如何向管道启动的所有 goroutine 广播“done”信号,并定义了正确构建管道的指南。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 4
用户头像

提供全面深入的云计算技术干货 2020-07-14 加入

生于云,长于云,让开发者成为决定性力量

评论

发布
暂无评论
实例详解在Go中构建流数据pipeline_开发_华为云开发者联盟_InfoQ写作社区