[翻译] Go Concurrency Patterns: Pipelines and cancellation[Go 并发模式]
The Go Blog
Go Concurrency Patterns: Pipelines and cancellation
Sameer Ajmani
13 March 2014
Introduction
Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
What is a pipeline?
There's no formal definition of a pipeline in Go; it's just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
We'll begin with a simple example pipeline to explain the ideas and techniques. Later, we'll present a more realistic example.
Squaring numbers 平方数
Consider a pipeline with three stages.
The first stage, gen
, is a function that converts a list of integers to a channel that emits the integers in the list. The gen
function starts a goroutine that sends the integers on the channel and closes the channel when all the values have been sent:
The second stage, sq
, receives integers from a channel and returns a channel that emits the square of each received integer. After the inbound channel is closed and this stage has sent all the values downstream, it closes the outbound channel:
The main
function sets up the pipeline and runs the final stage: it receives values from the second stage and prints each one, until the channel is closed:
Since sq
has the same type for its inbound and outbound channels, we can compose it any number of times. We can also rewrite main
as a range loop, like the other stages:
Fan-out, fan-in
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that's closed when all the inputs are closed. This is called fan-in.
We can change our pipeline to run two instances of sq
, each reading from the same input channel. We introduce a new function, merge, to fan in the results:
The merge
function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. Once all the output
goroutines have been started, merge
starts one more goroutine to close the outbound channel after all sends on that channel are done.
Sends on a closed channel panic, so it's important to ensure all sends are done before calling close. Thesync.WaitGroup type provides a simple way to arrange this synchronization:
Stopping short
There is a pattern to our pipeline functions:
stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed.
This pattern allows each receiving stage to be written as a range
loop and ensures that all goroutines exit once all values have been successfully sent downstream.
But in real pipelines, stages don't always receive all the inbound values. Sometimes this is by design: the receiver may only need a subset of values to make progress. More often, a stage exits early because an inbound value represents an error in an earlier stage. In either case the receiver should not have to wait for the remaining values to arrive, and we want earlier stages to stop producing values that later stages don't need.
In our example pipeline, if a stage fails to consume all the inbound values, the goroutines attempting to send those values will block indefinitely:
This is a resource leak: goroutines consume memory and runtime resources, and heap references in goroutine stacks keep data from being garbage collected. Goroutines are not garbage collected; they must exit on their own.
We need to arrange for the upstream stages of our pipeline to exit even when the downstream stages fail to receive all the inbound values. One way to do this is to change the outbound channels to have a buffer. A buffer can hold a fixed number of values; send operations complete immediately if there's room in the buffer:
When the number of values to be sent is known at channel creation time, a buffer can simplify the code. For example, we can rewrite gen
to copy the list of integers into a buffered channel and avoid creating a new goroutine:
Returning to the blocked goroutines in our pipeline, we might consider adding a buffer to the outbound channel returned by merge
:
While this fixes the blocked goroutine in this program, this is bad code. The choice of buffer size of 1 here depends on knowing the number of values merge
will receive and the number of values downstream stages will consume. This is fragile: if we pass an additional value to gen
, or if the downstream stage reads any fewer values, we will again have blocked goroutines.
Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.
Explicit cancellation
When main
decides to exit without receiving all the values from out
, it must tell the goroutines in the upstream stages to abandon the values they're trying to send. It does so by sending values on a channel called done
. It sends two values since there are potentially two blocked senders:
The sending goroutines replace their send operation with a select
statement that proceeds either when the send on out
happens or when they receive a value from done
. The value type of done
is the empty struct because the value doesn't matter: it is the receive event that indicates the send on out
should be abandoned. The output
goroutines continue looping on their inbound channel, c
, so the upstream stages are not blocked. (We'll discuss in a moment how to allow this loop to return early.)
This approach has a problem: each downstream receiver needs to know the number of potentially blocked upstream senders and arrange to signal those senders on early return. Keeping track of these counts is tedious and error-prone.
We need a way to tell an unknown and unbounded number of goroutines to stop sending their values downstream. In Go, we can do this by closing a channel, because a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.
This means that main
can unblock all the senders simply by closing the done
channel. This close is effectively a broadcast signal to the senders. We extend each of our pipeline functions to accept done
as a parameter and arrange for the close to happen via a defer
statement, so that all return paths from main
will signal the pipeline stages to exit.
Each of our pipeline stages is now free to return as soon as done
is closed. The output
routine in merge
can return without draining its inbound channel, since it knows the upstream sender, sq
, will stop attempting to send when done
is closed. output
ensures wg.Done
is called on all return paths via a defer
statement:
Similarly, sq
can return as soon as done
is closed. sq
ensures its out
channel is closed on all return paths via a defer
statement:
Here are the guidelines for pipeline construction:
stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
Pipelines unblock senders either by ensuring there's enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.
Digesting a tree
Let's consider a more realistic pipeline.
MD5 is a message-digest algorithm that's useful as a file checksum. The command line utility md5sum
prints digest values for a list of files.
Our example program is like md5sum
but instead takes a single directory as an argument and prints the digest values for each regular file under that directory, sorted by path name.
The main function of our program invokes a helper function MD5All
, which returns a map from path name to digest value, then sorts and prints the results:
The MD5All
function is the focus of our discussion. In serial.go, the implementation uses no concurrency and simply reads and sums each file as it walks the tree.
Parallel digestion
In parallel.go, we split MD5All
into a two-stage pipeline. The first stage, sumFiles
, walks the tree, digests each file in a new goroutine, and sends the results on a channel with value type result
:
sumFiles
returns two channels: one for the results
and another for the error returned by filepath.Walk
. The walk function starts a new goroutine to process each regular file, then checks done
. If done
is closed, the walk stops immediately:
MD5All
receives the digest values from c
. MD5All
returns early on error, closing done
via a defer
:
Bounded parallelism
The MD5All
implementation in parallel.go starts a new goroutine for each file. In a directory with many large files, this may allocate more memory than is available on the machine.
We can limit these allocations by bounding the number of files read in parallel. In bounded.go, we do this by creating a fixed number of goroutines for reading files. Our pipeline now has three stages: walk the tree, read and digest the files, and collect the digests.
The first stage, walkFiles
, emits the paths of regular files in the tree:
The middle stage starts a fixed number of digester
goroutines that receive file names from paths
and send results
on channel c
:
Unlike our previous examples, digester
does not close its output channel, as multiple goroutines are sending on a shared channel. Instead, code in MD5All
arranges for the channel to be closed when all the digesters
are done:
We could instead have each digester create and return its own output channel, but then we would need additional goroutines to fan-in the results.
The final stage receives all the results
from c
then checks the error from errc
. This check cannot happen any earlier, since before this point, walkFiles
may block sending values downstream:
Conclusion
This article has presented techniques for constructing streaming data pipelines in Go. Dealing with failures in such pipelines is tricky, since each stage in the pipeline may block attempting to send values downstream, and the downstream stages may no longer care about the incoming data. We showed how closing a channel can broadcast a "done" signal to all the goroutines started by a pipeline and defined guidelines for constructing pipelines correctly.
Further reading:
Go Concurrency Patterns (video) presents the basics of Go's concurrency primitives and several ways to apply them.
Advanced Go Concurrency Patterns (video) covers more complex uses of Go's primitives, especially
select
.Douglas McIlroy's paper Squinting at Power Series shows how Go-like concurrency provides elegant support for complex calculations.
Related articles
评论