写点什么

go channel 原理及使用场景

作者:六月的
  • 2022-10-19
    上海
  • 本文字数:8196 字

    阅读完需:约 1 分钟

转载自:go channel原理及使用场景

源码解析

type hchan struct {  qcount   uint           // Channel 中的元素个数  dataqsiz uint           // Channel 中的循环队列的长度  buf      unsafe.Pointer // Channel 的缓冲区数据指针  elemsize uint16 // 当前 Channel 能够收发的元素大小  closed   uint32  elemtype *_type // 当前 Channel 能够收发的元素类型  sendx    uint   // Channel 的发送操作处理到的位置  recvx    uint   // Channel 的接收操作处理到的位置  recvq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)  sendq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}
复制代码


创建 channel

channel 的初始化有 2 种,一种是没有缓冲区的 channel,一种是有缓冲区的 channel。对应的初始化之后 hchan 也是有区别的。


无缓冲区的 channel,初始化的时候只为 channel 分配内存,缓冲区 dataqsiz 的长度为 0


有缓冲的 channel,初始化时会为 channel 和缓冲区分配内存,dataqsiz 长度大于 0


同时 channel 的元素大小和缓冲区的长度都是有大小限制的


func makechan(t *chantype, size int) *hchan {  elem := t.elem
// compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
// 如果内存超了,或者分配的内存大于channel最大分配内存,或者分配的size小于0,直接Panic mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 如果没有缓冲区,分配一段内存 c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 有缓冲时,如果元素不包含指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 有缓冲区,且元素包含指针类型,channel和buf数组各自分配内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) }
// 元素大小,元素类型,循环数组长度,更新到channel c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c}
复制代码

发送数据(ch <- i)

  • 发送数据前会加锁,防止多个线程并发修改数据。如果 channel 已经关闭,直接 Panic


  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
复制代码


  • 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者

  • 当 channel 的 recvq 队列不为空,而且 channel 是没有数据数据写入的。这个时候如果有数据写入,会直接把数据拷贝到接收者变量所在的内存地址上。即使这是一个有缓冲的 channel,当有等待的接收者时,也是直接给接收者,不会先保存到循环队列


  // 如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据  if sg := c.recvq.dequeue(); sg != nil {      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true    }
// func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { // 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) // 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方; // 需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine goready(gp, skip+1) }
复制代码


  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区


  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    ...    // 如果当前元素数小于循环队列的长度    if c.qcount < c.dataqsiz {      // 使用 runtime.chanbuf 计算出下一个可以存储数据的位置      qp := chanbuf(c, c.sendx)      // 将发送的数据拷贝到缓冲区中      typedmemmove(c.elemtype, qp, ep)      // 发送的位置索引+1      c.sendx++      // 如果循环队列满了就从0开始      // 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置      if c.sendx == c.dataqsiz {        c.sendx = 0      }      // 增加当前元素数      c.qcount++      unlock(&c.lock)      return true    }    ...  }
复制代码


  • 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据

  • 当因为不存在缓冲区或者缓冲区已满无法写入时,会构造 sudog 等待执行的 gorutine 结构,放到 hchan 的等待队列中,直到被唤醒,把数据放到缓冲区或者直接拷贝给接收者


  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    ...    // 使用 select 关键字可以向 Channel 非阻塞地发送消息    if !block {      unlock(&c.lock)      return false    }
// 获取发送数据使用的 Goroutine gp := getg() // 获取 runtime.sudog 结构 mysg := acquireSudog() // 设置待发送数据的内存地址 mysg.elem = ep // 设置发送数据的goroutine mysg.g = gp mysg.isSelect = false // 设置发送的channel mysg.c = c // 设置到goroutine的waiting上 gp.waiting = mysg // 加入到发送等待队列 c.sendq.enqueue(mysg) // 阻塞等待唤醒 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep)
// someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
复制代码

接收数据(<- ch)

  • 从一个空 Channel 接收数据

  • goroutine 会让出使用权,并阻塞等待


    if c == nil {      if !block {        return      }      // 让出使用权      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)      throw("unreachable")    }
// 不获取锁的情况下,检查失败的非阻塞操作 if !block && empty(c) { // 显示未关闭,继续返回false,因为channel不会重新打开 if atomic.Load(&c.closed) == 0 { return }
if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } // Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) // Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
复制代码


  • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据

  • 如果是无缓冲的 channel,当有接收者进来时,会直接从阻塞的发送者拷贝数据

  • 如果是有缓冲的 channel,当有接收者进来时,会先从缓冲区拿数据,接着等待的发送者会把数据拷贝到缓冲区

  • 注意这个时候并没有直接去唤醒发送者,而是放到下次 p 的执行队列中中,下次调度时会唤醒发送者,发送者会做一些释放资源的操作


  if sg := c.sendq.dequeue(); sg != nil {      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true, true    }

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 如果无缓存,直接从发送者拷贝数据 recvDirect(c.elemtype, sg, ep) } } else { // 由于队列已满,接收数据的索引和发送数据的索引一致 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 数据从队列拷贝到目标内存地址 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 数据从发送者拷贝到缓冲区 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。 goready(gp, skip+1) }
复制代码


  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据


  if c.qcount > 0 {      // 直接从队列取数据      qp := chanbuf(c, c.recvx)      if raceenabled {        racenotify(c, c.recvx, nil)      }      // 放到目标内存      if ep != nil {        typedmemmove(c.elemtype, ep, qp)      }      // 清空队列中对应的元素      typedmemclr(c.elemtype, qp)      // 接收索引+1      c.recvx++      if c.recvx == c.dataqsiz {        c.recvx = 0      }      // 队列元素-1      c.qcount--      unlock(&c.lock)      return true, true    }
复制代码


  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据


  if !block {      unlock(&c.lock)      return false, false    }
// no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 阻塞等待,让出使用权 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 唤醒之后清空sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success
复制代码

关闭 channel

  • 当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常


  func closechan(c *hchan) {    if c == nil {      panic(plainError("close of nil channel"))    }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
复制代码


  • recvqsendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素


  c.closed = 1
var glist gList
// release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
// release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
// 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
复制代码

使用场景

报错情形

  • 往一个关闭的 channel 发送数据会报错:panic: send on closed channel

  • 关闭一个 nil 的 chan 会报错:panic: close of nil channel

  • 关闭一个已经关闭的 channel 报错:panic: close of closed channel

1、一个经典的算法题

有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出自己的编号,要求写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4...的顺序打印出来


package main
import ( "fmt" "time")
func main() { // 4个channel chs := make([]chan int, 4) for i, _ := range chs { chs[i] = make(chan int) // 开4个协程 go func(i int) { for { // 获取当前channel值并打印 v := <-chs[i] fmt.Println(v + 1) time.Sleep(time.Second) // 把下一个值写入下一个channel,等待下一次消费 chs[(i+1)%4] <- (v + 1) % 4 }
}(i) }
// 往第一个塞入0 chs[0] <- 0 select {}}
复制代码

2、限流器

package main
import ( "fmt" "time")
func main() { // 每次处理3个请求 chLimit := make(chan struct{}, 3) for i := 0; i < 20; i++ { chLimit <- struct{}{} go func(i int) { fmt.Println("下游服务处理逻辑...", i) time.Sleep(time.Second * 3) <-chLimit }(i) } time.Sleep(30 * time.Second)}
复制代码


如果觉得 sleep 太丑太暴力,可以用 waitGroup 控制结束时机


package main
import ( "fmt" "sync" "time")
var wg sync.WaitGroup
func main() { // 每次处理3个请求 chLimit := make(chan struct{}, 3) for i := 0; i < 20; i++ { chLimit <- struct{}{} wg.Add(1) go func(i int) { fmt.Println("下游服务处理逻辑...", i) time.Sleep(time.Second * 3) <-chLimit wg.Done() }(i) } wg.Wait()}
复制代码

3、优雅退出

package main
import ( "fmt" "log" "os" "os/signal" "syscall" "time")
func main() { var closing = make(chan struct{}) var closed = make(chan struct{})
go func() { for { select { case <-closing: return default: fmt.Println("业务逻辑...") time.Sleep(1 * time.Second) } } }()
termChan := make(chan os.Signal) // 监听退出信号 signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan
// 退出中 close(closing)
// 退出之前清理一下 go doCleanup(closed)
select { case <-closed: case <-time.After(time.Second): log.Println("清理超时不等了") }
log.Println("优雅退出")}
func doCleanup(closed chan struct{}) { time.Sleep(time.Minute) // 清理完后退出 close(closed)}
复制代码

4、实现互斥锁

初始化一个缓冲区为 1 的 channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,释放锁的时候再把这个元素放回 channel


package main
import ( "log" "time")
type Mutex struct { ch chan struct{}}
// 初始化锁func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu}
// 加锁,阻塞获取func (m *Mutex) Lock() { <- m.ch}
// 释放锁func (m *Mutex) Unlock() { select { // 成功写入channel代表释放成功 case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") }}
// 尝试获取锁func (m *Mutex) TryLock() bool { select { case <-m.ch: return true default:
} return false}
func (m *Mutex) LockTimeout(timeout time.Duration) bool { timer := time.NewTimer(timeout)
select { case <-m.ch: // 成功获取锁关闭定时器 timer.Stop() return true case <-timer.C:
} // 获取锁超时 return false}
// 是否上锁func (m *Mutex) IsLocked() bool { return len(m.ch) == 0}

func main() { m := NewMutex() ok := m.TryLock() log.Printf("locked v %v\n", ok) ok = m.TryLock() log.Printf("locked v %v\n", ok)
go func() { time.Sleep(5*time.Second) m.Unlock() }()
ok = m.LockTimeout(10*time.Second) log.Printf("LockTimeout v %v\n", ok)}
复制代码


参考:


极刻时间《go 并发编程实战》

用户头像

六月的

关注

还未添加个人签名 2019-07-23 加入

还未添加个人简介

评论

发布
暂无评论
go channel原理及使用场景_Go_六月的_InfoQ写作社区