Go 扇入 / 扇出
发布于: 刚刚
译者:baiyutang
将一些工作分成 N 个部分,并发地执行它们,等待它们全部完成,然后将所有结果合并在一起。
我以前做过很多次了,但是这次我忘记怎么做了。我离开电脑几分钟整理一下思绪,然后又回到电脑前。这样我就不用忘记怎么做了,我想把算法写下来!
我们在这做什么
workToDo := []int{"do", "some", "work"}for idx, work := range workToDo { // make sure you pass the index and work into the // function that runs in the goroutine. // this mechanism makes sure that the goroutine // gets a (stack-allocated) _copy_ of the data. // if you don't do this, idx and work will change out // from under it as the loop progresses. go func(idx int, work string) { fmt.Println(idx, work) }(idx, work)}复制代码
等待工作完成
var wg sync.WaitGroupworkToDo := []int{"do", "some", "work"}for idx, work := range workToDo { // add 1 to the waitgroup _before_ you start the goroutine. // you want to do this in the same goroutine as where // you call wg.Wait() so that you're sure that, even if // none of the goroutines started yet, you have the // right number of pending work. wg.Add(1) // make sure you pass the index and work into the // function that runs in the goroutine. // this mechanism makes sure that the goroutine // gets a (stack-allocated) _copy_ of the data. // if you don't do this, idx and work will change out // from under it as the loop progresses. go func(idx int, work string) { // wg.Done() tells the WaitGroup that we're done in // this goroutine. In other words, it decrements // the internal WaitGroup counter, whereas wg.Add(1) // above increments it. // Most commonly, we just do this in a defer statement. defer wg.Done() // this is the "work". in the next section, we'll be // changing this to return a value, because we'll // need to send that value somewhere fmt.Println(idx, work) }(idx, work)}// wait for all the goroutines to finish. this call// blocks until the WaitGroup's internal count goes // to zerowg.Wait()复制代码
获取结果
/ this is the channel that will hold the results of the workresultCh := make(chan string)var wg sync.WaitGroupworkToDo := []string{"do", "some", "work"}for idx, work := range workToDo { // add 1 to the waitgroup _before_ you start the goroutine. // you want to do this in the same goroutine as where // you call wg.Wait() so that you're sure that, even if // none of the goroutines started yet, you have the // right number of pending work. wg.Add(1) // this is the loop-local channel that our first goroutine // will send its results to. we'll start up a second // goroutine to forward its results to the final channel. ch := make(chan string) // make sure you pass the index and work into the // function that runs in the goroutine. // this mechanism makes sure that the goroutine // gets a (stack-allocated) _copy_ of the data. // if you don't do this, idx and work will change out // from under it as the loop progresses. go func(idx int, work string) { // this is the "work". right now, it just returns an // int. in the next section, it will return both an int // and an error res := doSomeWork(idx, work) ch <- res }(idx, work) // start up another goroutine to forward the results from // ch to resultCh go func() { // we want to indicate that we're done after we forward // the result to the final channel, _not_ just when we're // done with the actual computation. this arrangement // will be useful below, in our final goroutine that // runs after the for loop is done defer wg.Done() res := <-ch resultCh <- res }()}// start up a final goroutine that is going to watch for// the moment when all of the loop goroutines are both//// 1. done with their work// 2. done sending their results to the final channel//// after that, we can close the resultCh. this closure is// important for the following for loop, since ranging over// a channel will only stop after that channel is closedgo func() { wg.Wait() close(resultCh)}()
// now that we have that final goroutine running, we can// be sure that this for loop will end after://// 1. all goroutines are done with their work// 2. all goroutines are done sending their work to resultCh// 3. we have processed each result// (in this case, we just print it out)for result := range resultCh { fmt.Println("result:", result)}复制代码
最后的小困难:error 处理
// this is the channel that will hold the results of the workresultCh := make(chan string)// this channel receives all the errors that occur.// for each work item, either resultCh or errCh will receive// precisely once. both channels will be closed immediately// after all receives happenerrCh := make(chan error)var wg sync.WaitGroupworkToDo := []string{"do", "some", "work"}for idx, work := range workToDo { // add 1 to the waitgroup _before_ you start the goroutine. // you want to do this in the same goroutine as where // you call wg.Wait() so that you're sure that, even if // none of the goroutines started yet, you have the // right number of pending work. wg.Add(1) // this is the loop-local channel that our first goroutine // will send its results to. we'll start up a second // goroutine to forward its results to the final channel. ch := make(chan string) // this is the loop-local channel that our first goroutine // will send errors on. for each loop iteration, exactly // one of ch or errCh will receive eCh := make(chan error) // make sure you pass the index and work into the // function that runs in the goroutine. // this mechanism makes sure that the goroutine // gets a (stack-allocated) _copy_ of the data. // if you don't do this, idx and work will change out // from under it as the loop progresses. go func(idx int, work string) { // this is the "work". right now, it just returns an // int. in the next section, it will return both an int // and an error res, err := doSomeWork(idx, work) if err != nil { eCh <- err } else { ch <- res } }(idx, work) // start up another goroutine to forward the results from: // // - ch to resultCh // - eCh to errCh go func() { // we want to indicate that we're done after we do the // forward operation, similar to the code in the // previous section defer wg.Done() // only one forward operation will happen per loop // iteration, so we use a select to choose exactly // one of the channels - either the success or error // one. select { case res := <-ch: resultCh <- res case err := <-eCh: errCh <- err } }()}// start up a final goroutine that is going to watch for// the moment when all of the loop goroutines are both//// 1. done with their work// 2. done sending their results to the appropriate channel//// after that, we can close both resultCh and errCh.go func() { wg.Wait() close(resultCh) close(errCh)}()
// we're now at a point where we have two "final" channels://// - one for the successful results// - one for the errors//// we have a few choices on how to handle them, and it's// largely up to your use case how you handle errors or success// results. In our case, we'll loop through both channels,// print out the result either way, and then exit when all// receives happen.
// these two booleans are going to keep track of when // each channel is closed and done receivingresultsDone := falseerrsDone := false// we're going to use an infinite loop and break out of it// when both channels are done receivingfor { if resultsDone && errsDone { break } select { case res, valid := <-resultCh: if !valid { resultsDone = true } else { fmt.Println("result:", res) } case err, valid := <-errCh: if !valid { errsDone = true } else { fmt.Println("error:", err) } }}复制代码
划线
评论
复制
发布于: 刚刚阅读数: 4
版权声明: 本文为 InfoQ 作者【baiyutang】的原创文章。
原文链接:【http://xie.infoq.cn/article/3f02e4533b3555b1c4db477a8】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
baiyutang
关注
广州 2017.12.13 加入
Microservices | Golang | Cloud Nitive | “Smart work,Not hard”











评论