Go 扇入 / 扇出
发布于: 刚刚
译者:baiyutang
将一些工作分成 N 个部分,并发地执行它们,等待它们全部完成,然后将所有结果合并在一起。
我以前做过很多次了,但是这次我忘记怎么做了。我离开电脑几分钟整理一下思绪,然后又回到电脑前。这样我就不用忘记怎么做了,我想把算法写下来!
我们在这做什么
workToDo := []int{"do", "some", "work"}
for idx, work := range workToDo {
// make sure you pass the index and work into the
// function that runs in the goroutine.
// this mechanism makes sure that the goroutine
// gets a (stack-allocated) _copy_ of the data.
// if you don't do this, idx and work will change out
// from under it as the loop progresses.
go func(idx int, work string) {
fmt.Println(idx, work)
}(idx, work)
}
复制代码
等待工作完成
var wg sync.WaitGroup
workToDo := []int{"do", "some", "work"}
for idx, work := range workToDo {
// add 1 to the waitgroup _before_ you start the goroutine.
// you want to do this in the same goroutine as where
// you call wg.Wait() so that you're sure that, even if
// none of the goroutines started yet, you have the
// right number of pending work.
wg.Add(1)
// make sure you pass the index and work into the
// function that runs in the goroutine.
// this mechanism makes sure that the goroutine
// gets a (stack-allocated) _copy_ of the data.
// if you don't do this, idx and work will change out
// from under it as the loop progresses.
go func(idx int, work string) {
// wg.Done() tells the WaitGroup that we're done in
// this goroutine. In other words, it decrements
// the internal WaitGroup counter, whereas wg.Add(1)
// above increments it.
// Most commonly, we just do this in a defer statement.
defer wg.Done()
// this is the "work". in the next section, we'll be
// changing this to return a value, because we'll
// need to send that value somewhere
fmt.Println(idx, work)
}(idx, work)
}
// wait for all the goroutines to finish. this call
// blocks until the WaitGroup's internal count goes
// to zero
wg.Wait()
复制代码
获取结果
/ this is the channel that will hold the results of the work
resultCh := make(chan string)
var wg sync.WaitGroup
workToDo := []string{"do", "some", "work"}
for idx, work := range workToDo {
// add 1 to the waitgroup _before_ you start the goroutine.
// you want to do this in the same goroutine as where
// you call wg.Wait() so that you're sure that, even if
// none of the goroutines started yet, you have the
// right number of pending work.
wg.Add(1)
// this is the loop-local channel that our first goroutine
// will send its results to. we'll start up a second
// goroutine to forward its results to the final channel.
ch := make(chan string)
// make sure you pass the index and work into the
// function that runs in the goroutine.
// this mechanism makes sure that the goroutine
// gets a (stack-allocated) _copy_ of the data.
// if you don't do this, idx and work will change out
// from under it as the loop progresses.
go func(idx int, work string) {
// this is the "work". right now, it just returns an
// int. in the next section, it will return both an int
// and an error
res := doSomeWork(idx, work)
ch <- res
}(idx, work)
// start up another goroutine to forward the results from
// ch to resultCh
go func() {
// we want to indicate that we're done after we forward
// the result to the final channel, _not_ just when we're
// done with the actual computation. this arrangement
// will be useful below, in our final goroutine that
// runs after the for loop is done
defer wg.Done()
res := <-ch
resultCh <- res
}()
}
// start up a final goroutine that is going to watch for
// the moment when all of the loop goroutines are both
//
// 1. done with their work
// 2. done sending their results to the final channel
//
// after that, we can close the resultCh. this closure is
// important for the following for loop, since ranging over
// a channel will only stop after that channel is closed
go func() {
wg.Wait()
close(resultCh)
}()
// now that we have that final goroutine running, we can
// be sure that this for loop will end after:
//
// 1. all goroutines are done with their work
// 2. all goroutines are done sending their work to resultCh
// 3. we have processed each result
// (in this case, we just print it out)
for result := range resultCh {
fmt.Println("result:", result)
}
复制代码
最后的小困难:error 处理
// this is the channel that will hold the results of the work
resultCh := make(chan string)
// this channel receives all the errors that occur.
// for each work item, either resultCh or errCh will receive
// precisely once. both channels will be closed immediately
// after all receives happen
errCh := make(chan error)
var wg sync.WaitGroup
workToDo := []string{"do", "some", "work"}
for idx, work := range workToDo {
// add 1 to the waitgroup _before_ you start the goroutine.
// you want to do this in the same goroutine as where
// you call wg.Wait() so that you're sure that, even if
// none of the goroutines started yet, you have the
// right number of pending work.
wg.Add(1)
// this is the loop-local channel that our first goroutine
// will send its results to. we'll start up a second
// goroutine to forward its results to the final channel.
ch := make(chan string)
// this is the loop-local channel that our first goroutine
// will send errors on. for each loop iteration, exactly
// one of ch or errCh will receive
eCh := make(chan error)
// make sure you pass the index and work into the
// function that runs in the goroutine.
// this mechanism makes sure that the goroutine
// gets a (stack-allocated) _copy_ of the data.
// if you don't do this, idx and work will change out
// from under it as the loop progresses.
go func(idx int, work string) {
// this is the "work". right now, it just returns an
// int. in the next section, it will return both an int
// and an error
res, err := doSomeWork(idx, work)
if err != nil {
eCh <- err
} else {
ch <- res
}
}(idx, work)
// start up another goroutine to forward the results from:
//
// - ch to resultCh
// - eCh to errCh
go func() {
// we want to indicate that we're done after we do the
// forward operation, similar to the code in the
// previous section
defer wg.Done()
// only one forward operation will happen per loop
// iteration, so we use a select to choose exactly
// one of the channels - either the success or error
// one.
select {
case res := <-ch:
resultCh <- res
case err := <-eCh:
errCh <- err
}
}()
}
// start up a final goroutine that is going to watch for
// the moment when all of the loop goroutines are both
//
// 1. done with their work
// 2. done sending their results to the appropriate channel
//
// after that, we can close both resultCh and errCh.
go func() {
wg.Wait()
close(resultCh)
close(errCh)
}()
// we're now at a point where we have two "final" channels:
//
// - one for the successful results
// - one for the errors
//
// we have a few choices on how to handle them, and it's
// largely up to your use case how you handle errors or success
// results. In our case, we'll loop through both channels,
// print out the result either way, and then exit when all
// receives happen.
// these two booleans are going to keep track of when
// each channel is closed and done receiving
resultsDone := false
errsDone := false
// we're going to use an infinite loop and break out of it
// when both channels are done receiving
for {
if resultsDone && errsDone {
break
}
select {
case res, valid := <-resultCh:
if !valid {
resultsDone = true
} else {
fmt.Println("result:", res)
}
case err, valid := <-errCh:
if !valid {
errsDone = true
} else {
fmt.Println("error:", err)
}
}
}
复制代码
划线
评论
复制
发布于: 刚刚阅读数: 4
版权声明: 本文为 InfoQ 作者【baiyutang】的原创文章。
原文链接:【http://xie.infoq.cn/article/3f02e4533b3555b1c4db477a8】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
baiyutang
关注
广州 2017.12.13 加入
Microservices | Golang | Cloud Nitive | “Smart work,Not hard”
评论