写点什么

Go 扇入 / 扇出

用户头像
baiyutang
关注
发布于: 刚刚

译者:baiyutang

原文:https://dev.to/arschles/fan-in-fan-out-with-go-19ah


将一些工作分成 N 个部分,并发地执行它们,等待它们全部完成,然后将所有结果合并在一起。


我以前做过很多次了,但是这次我忘记怎么做了。我离开电脑几分钟整理一下思绪,然后又回到电脑前。这样我就不用忘记怎么做了,我想把算法写下来!


我们在这做什么


workToDo := []int{"do", "some", "work"}for idx, work := range workToDo {    // make sure you pass the index and work into the     // function that runs in the goroutine.    // this mechanism makes sure that the goroutine    // gets a (stack-allocated) _copy_ of the data.    // if you don't do this, idx and work will change out    // from under it as the loop progresses.    go func(idx int, work string) {        fmt.Println(idx, work)    }(idx, work)}
复制代码


等待工作完成


var wg sync.WaitGroupworkToDo := []int{"do", "some", "work"}for idx, work := range workToDo {    // add 1 to the waitgroup _before_ you start the goroutine.    // you want to do this in the same goroutine as where    // you call wg.Wait() so that you're sure that, even if    // none of the goroutines started yet, you have the    // right number of pending work.    wg.Add(1)    // make sure you pass the index and work into the     // function that runs in the goroutine.    // this mechanism makes sure that the goroutine    // gets a (stack-allocated) _copy_ of the data.    // if you don't do this, idx and work will change out    // from under it as the loop progresses.    go func(idx int, work string) {        // wg.Done() tells the WaitGroup that we're done in        // this goroutine. In other words, it decrements        // the internal WaitGroup counter, whereas wg.Add(1)        // above increments it.        // Most commonly, we just do this in a defer statement.        defer wg.Done()        // this is the "work". in the next section, we'll be        // changing this to return a value, because we'll        // need to send that value somewhere        fmt.Println(idx, work)    }(idx, work)}// wait for all the goroutines to finish. this call// blocks until the WaitGroup's internal count goes // to zerowg.Wait()
复制代码


获取结果


/ this is the channel that will hold the results of the workresultCh := make(chan string)var wg sync.WaitGroupworkToDo := []string{"do", "some", "work"}for idx, work := range workToDo {    // add 1 to the waitgroup _before_ you start the goroutine.    // you want to do this in the same goroutine as where    // you call wg.Wait() so that you're sure that, even if    // none of the goroutines started yet, you have the    // right number of pending work.    wg.Add(1)    // this is the loop-local channel that our first goroutine    // will send its results to. we'll start up a second    // goroutine to forward its results to the final channel.    ch := make(chan string)    // make sure you pass the index and work into the    // function that runs in the goroutine.    // this mechanism makes sure that the goroutine    // gets a (stack-allocated) _copy_ of the data.    // if you don't do this, idx and work will change out    // from under it as the loop progresses.    go func(idx int, work string) {        // this is the "work". right now, it just returns an        // int. in the next section, it will return both an int        // and an error        res := doSomeWork(idx, work)        ch <- res    }(idx, work)    // start up another goroutine to forward the results from    // ch to resultCh    go func() {        // we want to indicate that we're done after we forward        // the result to the final channel, _not_ just when we're        // done with the actual computation. this arrangement        // will be useful below, in our final goroutine that        // runs after the for loop is done        defer wg.Done()        res := <-ch        resultCh <- res    }()}// start up a final goroutine that is going to watch for// the moment when all of the loop goroutines are both//// 1. done with their work// 2. done sending their results to the final channel//// after that, we can close the resultCh. this closure is// important for the following for loop, since ranging over// a channel will only stop after that channel is closedgo func() {    wg.Wait()    close(resultCh)}()
// now that we have that final goroutine running, we can// be sure that this for loop will end after://// 1. all goroutines are done with their work// 2. all goroutines are done sending their work to resultCh// 3. we have processed each result// (in this case, we just print it out)for result := range resultCh { fmt.Println("result:", result)}
复制代码


最后的小困难:error 处理


// this is the channel that will hold the results of the workresultCh := make(chan string)// this channel receives all the errors that occur.// for each work item, either resultCh or errCh will receive// precisely once. both channels will be closed immediately// after all receives happenerrCh := make(chan error)var wg sync.WaitGroupworkToDo := []string{"do", "some", "work"}for idx, work := range workToDo {    // add 1 to the waitgroup _before_ you start the goroutine.    // you want to do this in the same goroutine as where    // you call wg.Wait() so that you're sure that, even if    // none of the goroutines started yet, you have the    // right number of pending work.    wg.Add(1)    // this is the loop-local channel that our first goroutine    // will send its results to. we'll start up a second    // goroutine to forward its results to the final channel.    ch := make(chan string)    // this is the loop-local channel that our first goroutine    // will send errors on. for each loop iteration, exactly    // one of ch or errCh will receive    eCh := make(chan error)    // make sure you pass the index and work into the    // function that runs in the goroutine.    // this mechanism makes sure that the goroutine    // gets a (stack-allocated) _copy_ of the data.    // if you don't do this, idx and work will change out    // from under it as the loop progresses.    go func(idx int, work string) {        // this is the "work". right now, it just returns an        // int. in the next section, it will return both an int        // and an error        res, err := doSomeWork(idx, work)        if err != nil {            eCh <- err        } else {                     ch <- res        }    }(idx, work)    // start up another goroutine to forward the results from:    //    // - ch to resultCh    // - eCh to errCh    go func() {        // we want to indicate that we're done after we do the        // forward operation, similar to the code in the        // previous section        defer wg.Done()        // only one forward operation will happen per loop        // iteration, so we use a select to choose exactly        // one of the channels - either the success or error        // one.        select {            case res := <-ch:                resultCh <- res            case err := <-eCh:                errCh <- err        }    }()}// start up a final goroutine that is going to watch for// the moment when all of the loop goroutines are both//// 1. done with their work// 2. done sending their results to the appropriate channel//// after that, we can close both resultCh and errCh.go func() {    wg.Wait()    close(resultCh)    close(errCh)}()
// we're now at a point where we have two "final" channels://// - one for the successful results// - one for the errors//// we have a few choices on how to handle them, and it's// largely up to your use case how you handle errors or success// results. In our case, we'll loop through both channels,// print out the result either way, and then exit when all// receives happen.
// these two booleans are going to keep track of when // each channel is closed and done receivingresultsDone := falseerrsDone := false// we're going to use an infinite loop and break out of it// when both channels are done receivingfor { if resultsDone && errsDone { break } select { case res, valid := <-resultCh: if !valid { resultsDone = true } else { fmt.Println("result:", res) } case err, valid := <-errCh: if !valid { errsDone = true } else { fmt.Println("error:", err) } }}
复制代码


发布于: 刚刚阅读数: 4
用户头像

baiyutang

关注

广州 2017.12.13 加入

Microservices | Golang | Cloud Nitive | “Smart work,Not hard”

评论

发布
暂无评论
Go 扇入 / 扇出