本文档主要分析 go channel 源码,go 版本为 1.15.4,源码路径 GOROOT/src/runtime/chan.go
针对读写,select,for range 进行源码分析,更倾向于看一下对应底层调用了什么方法。channel 原理,go 的切换与 GMP 的关联,放在另一篇文章再叙述。
Channel 结构体
type hchan struct {
qcount uint // 有效用户元素,在出队入队时改变
dataqsiz uint // buffer长度,初始化时赋值,不会改变
buf unsafe.Pointer // buffer数组的地址
elemsize uint16 // 元素大小,结合dataqsiz可以计算出buf内存大小
closed uint32
elemtype *_type // 元素类型
sendx uint //
recvx uint // receive index
recvq waitq // 等待recv响应的对象列表
sendq waitq // 等待send响应的对象列表
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
//双向队列
type waitq struct {
first *sudog
last *sudog
}
复制代码
c := make(chan int)
初始化 channel 对应的底层源码就是 makechan,输入是一个 chantype(元素类型)和 size(槽位大小
参数校验完后,会对 hchan 初始化。switch 三种 case 表示三种初始化的类型:
zero buf
no buffer 可以看做 chan
non pointer element
channel 元素不含指针的场景,会分配一个较大的内存空间。由于一起分配(L101,L102),所以内存是连续分配的。
pointer element
默认场景,channel 元素带指针,单独分配内存
初始化 elemsize/elemtype/dataqsiz
blocking Write & non blocking Write
//blocking,对应chansend的block参数为true
c <- v
//non blocking,对应chansend的block参数为false。
//没有写default,也没有receiver的话也会deadlock
select {
case c<-v :{
//...
}
default:
//...
}
复制代码
c <- v
c 为 channel,v 为写入的值
对一个 channel 的写操作,底层对应 chansend 方法,一般有 2 种情况
对一个 non blocking,且没有缓冲区的 channel 写入,在没有 receiver 的情况下,会 return false,但不会阻塞。由于,没有 receiver,所以不需要 lock,可以直接判断。
//仅non blocking会在这里判断
func nonBlockingChannel() {
c := make(chan int)
select {
case c <- 1 :{
fmt.Println("1")
}
default:
//打印这里
fmt.Println("2")
}
}
复制代码
如果有 receiver,就不会有问题,见下面的第 4 点。除非是带有 buffer 的 channel,见第 5 点。
channel 操作,都在互斥锁下完成,其中一个主要目的就是要锁 receq 和 sendq
禁止往关闭的 channel 写数据,不然会 panic
func writeClosedChannel() {
c := make(chan int)
close(c)
c <- 1
}
复制代码
有 channel 在等着收数据,就取出来往这个 channel 写
相比较第 1 点没有 receiver,如果有 receiver,full()就是 false。取得一个 receiver 后,发送。
//non blocking channel
//先准备一个receiver,等到select写入的时候就不会return false
func nonBlockingChannelWithoutBufferWrite() {
c := make(chan int)
go func() {
for {
select {
case <-c:
{
fmt.Println(time.Now().Unix(), "receive")
}
}
}
}()
select {
case c <- 1:
{
fmt.Println(time.Now().Unix(), "send")
}
}
}
//blocking
func blockingChannelWithoutBufferWrite() {
c := make(chan int)
go func() {
select {
case <-c:
{
fmt.Println("no deadlock")
}
}
}()
c <- 1
}
复制代码
buffer 还有空间,就元素往里面写,递增索引
在有 buffer 的时候,写入数据返回 true,不会导致 deadlock。
//non blocking channel
func nonBlockingChannelWithBuffer() {
c := make(chan int,1)
select {
case c <- 1 :{
//打印这里
fmt.Println("1")
}
default:
fmt.Println("2")
}
}
//blocking channel
func blockingChannelWithBuffer() {
c := make(chan int, 2)
c <- 1
fmt.Println("its available")
}
复制代码
非 pointer value 的 channel,会分配一个连续的内存空间,所以在写入数据的时候,可以通过指针位移的方式去写入。
非阻塞式超过 buffer 大小直接返回,select 的时候 block 为 true。如果是 blocking channel 会直接到第 7 步,切走 goruntine。
当写入数据长度超过 buffer 大小时,即 dataqsiz,如果不是 select 的情况,会一直阻塞 deadlock。
//non blocking channel写入超过buffer长度的数据时,会执行default里面的内容
func channelWithBufferWriteOver() {
c := make(chan int, 2)
for i := 0; ; i++ {
select {
case c <- i:
{
fmt.Println("send success: ", i)
}
default:
fmt.Println("buffer over")
return
}
}
}
复制代码
goruntine 切走,L252 将 goruntine 入队,等待条件唤醒。L258,gopark 将切走 goruntine,让出 CPU。代码块里面末尾'...',就是等待唤醒的代码
//blocking channel在写入超过buffer长度后会deadlock
func channelWithBufferWriteOver() {
c := make(chan int, 2)
c <- 1
fmt.Println("1")
c <- 2
fmt.Println("2")
c <- 3
fmt.Println("3")
}
复制代码
blocking Read & non blocking Read
//non blocking read
select {
case v := <-c:
... foo
default:
... bar
}
select {
case v,ok := <-c:
... foo
default:
... bar
}
复制代码
//blocking read
<- c
或
v,ok := <-c
复制代码
<- c 和 v,ok:=<-c
读 channel 里面数据分为两种情况,就是是否有判断 channel 关闭
<-c 对应的是 chanrec1
v,ok :=<-c 对应的是 chanrecv2
两个调用的底层方法都是 chanrecv,总共有三种返回结果
如果是非阻塞模式(block 为 false),并且没有任何可用元素,返回 selected=false,received=false,这样就不会进入 select 的 case 分支
如果是阻塞式(block 为 true),在 channel 已经 close 的情况下,会返回 selected=true,received=false,因为 channel 已经关闭,读不到数据了
如果是阻塞模式,且 channel 还未关闭,返回 selected=true,received=true,表示有读到数据
和写最大的区别就在于读 closed 的 channel 是不会 panic,以及对 recv 队列入队,sendq 队列出队
non blocking channel 先判断是否为空,直接返回 false。和 chansend 不同的是,这里使用原子操作来保证。
To prevent reordering, we use atomic loads for both checks, and rely on emptying and closing to happen in separate critical sections under the same lock. This assumption fails when closing an unbuffered channel with a blocked send, but that is an error condition anyway.
为了防止重新排序,我们对这两种检查都使用原子加载,并依赖于在同一锁下的不同临界区中发生清空和关闭。当关闭一个发送阻塞的非缓冲通道时,这种假设就失败了,但无论如何,这是一个错误条件。
//non blocking read with empty data in channel
func nonBlockingChannelRead() {
c := make(chan int)
select {
case <-c:
fmt.Println("receive")
default:
fmt.Println("no receive")
}
}
复制代码
从一个 closed 的 channel 读,是不会 panic 的,这个和 write 不同,切记。最后返回的 received 为 false,因此在读 channel 的时候,最后判断一下 channel 是否关闭。
func readClosedChannel() {
c := make(chan int)
close(c)
<-c
fmt.Println("channel close")
_, ok := <-c
if !ok {
fmt.Println("channel no ok")
return
}
fmt.Println("success")
}
复制代码
有等待的 sender,就从 sendq 中 dequeue 出来,读数据。有 write 但是没有 read,会进到 sendq 里。
//non blocking channel with buffer read
func nonBlockingChannelWithBufferRead() {
c := make(chan int, 1)
c <- 1
select {
case v,_ := <-c:
{
fmt.Println(v)
}
default:
fmt.Println("no one send")
}
}
//blocking channel with buffer read
func blockingChannelWithBufferRead() {
c := make(chan int)
go func() {
c <- 1
}()
fmt.Println("start hold")
time.Sleep(time.Second)
v := <-c
fmt.Println("read:", v)
}
复制代码
non blocking 或者带有缓冲的 channel send 完,然后有人来读
func nonBlockingChannelReadDirect() {
c := make(chan int, 2)
c <- 1
c <- 2
for i := 0; ; i++ {
select {
case v := <-c:
{
fmt.Println("read success: ", v)
}
default:
fmt.Println("buffer empty")
return
}
}
}
复制代码
func blockingChannelWithBufferReadDirect() {
c := make(chan int, 2)
c <- 1
c <- 2
v := <-c
fmt.Println("read 1 : ", v)
v = <-c
fmt.Println("read 2 : ", v)
}
复制代码
没想到什么情况下会走过来,这里的情况,猜想是 lock 之后,某些情况导致,non blocking channel 在 read 的时候 qcount 被别的 channel 读完,导致了第 4 点没执行到。
goruntine 切走,L571 将 goruntine 入队,等待条件唤醒。L577,gopark 将切走 goruntine,让出 CPU。代码块里面末尾'...',就是等待唤醒的代码
//blocking channel把buffer读完后,再读会deadlock
func channelWithBufferReadOver() {
c := make(chan int, 2)
c <- 1
c <- 2
v := <-c
fmt.Println("read 1 : ", v)
v = <-c
fmt.Println("read 2 : ", v)
v = <-c
fmt.Println("read 3: ", v)
}
复制代码
select
结合 c <- v
c 为 channel,v 为写入的值
select {
case c <- v :
//...
default:
//...
}
复制代码
转换为
if selectnbsend(c,v) {
//...
} else {
//...
}
复制代码
对应底层源码 selectnbsend
范例代码见上面 c<-v 第 5 点
结合 v: =<-c
select {
case v := <- c :
//...
default:
//...
}
复制代码
转换为
if selectnbrecv(&v,c) {
//...
} else {
//...
}
复制代码
对应底层源码 selectnbrecv
范例代码见上面<-c 和 v,ok:=<-c 第 3,4 点
结合 v,ok:=<-c
select {
case v,ok := <- c :
//...
default:
//...
}
复制代码
转换为
if selectnbrecv2(&v,&ok,c) {
//...
} else {
//...
}
复制代码
对应底层源码 selectnbrecv2
范例代码见上面<-c 和 v,ok:=<-c 第 3,4 点
channel range
if selectnbrecv2(&v,&ok,c) {
//...
} else {
//...
}
复制代码
转换为
for(;ok = chanrecv2(c,ep);) {
//...
}
复制代码
对应底层源码 chanrecv2
//for range也是一个blocking channel,没有write,就会deadlock
func forRangeChannel() {
c := make(chan int, 2)
c <- 1
c <- 2
for v := range c {
fmt.Println("read :", v)
}
}
复制代码
评论