写点什么

Goroutine 间的“灵魂管道”:Channel 如何实现数据同步与因果传递?

作者:poemyang

Channel 是连接 Goroutine 的“管道”,是 CSP 理念在 Golang 中的具象化实现。它不仅是数据传递的队列,更是 Goroutine 间同步的天然工具,让开发者无需诉诸显式的锁或条件变量。


func main() {  ch := make(chan int, 1) // 创建一个int,缓冲区大小为1的Channel  ch <- 2                 // 将2发送到ch
go func() { // 开启一个异步Goroutine n, ok := <-ch // n接收从ch发出的值,如果没有接收到数据,将会阻塞等待 if ok { fmt.Println(n) // 2 } }()
close(ch) // 关闭Channel}
复制代码


Channel 数据结构

Channel 在运行时使用 src/runtime/chan.go 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的是如下所示的结构。


type hchan struct {  qcount   uint           // 队列中所有数据总数  dataqsiz uint           // 环形队列的 size  buf      unsafe.Pointer // 指向 dataqsiz 长度的数组  elemsize uint16         // 元素大小  closed   uint32  elemtype *_type         // 元素类型  sendx    uint           // 已发送的元素在环形队列中的位置  recvx    uint           // 已接收的元素在环形队列中的位置  recvq    waitq          // 接收者的等待队列  sendq    waitq          // 发送者的等待队列
lock mutex}
复制代码



runtime.hchan 结构体中的五个字段 qcount、dataqsiz、buf、sendx、recv 构建底层的循环队列。除此之外,elemsize 和 elemtype 分别表示当前 Channel 能够收发的元素类型和大小。sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构。


type waitq struct {    first *sudog     last  *sudog}
复制代码


runtime.sudog(Scheduling Unit Descriptor)是用于实现 Goroutine 调度的一种数据结构。它包含了与 Goroutine 相关的信息,如 Goroutine 的状态、等待的条件、等待的时间等。当一个 Goroutine 需要等待某个事件或条件时,它会创建一个 runtime.sudog,并将其加入到等待队列中。当事件或条件满足时,等待队列中的 runtime.sudog 会被唤醒,从而允许对应的 Goroutine 继续执行。Channel 发送数据 1)如果等待接收的队列 recvq 中存在 Goroutine,那么直接把正在发送的值发送给等待接收的 Goroutine。



2)当缓冲区未满时,找到 sendx 所指向的缓冲区数组的位置,将正在发送的值拷贝到该位置,并增加 sendx 索引以及释放锁。



3)如果是阻塞发送,那么就将当前的 Goroutine 打包成一个 sudog 结构体,并加入到 Channel 的发送队列 sendq 里。



之后则调用 goparkunlock 将当前 Goroutine 设置为_Gwaiting 状态并解锁,进入阻塞状态等待被唤醒;如果被调度器唤醒,执行清理工作并最终释放对应的 sudog 结构体。


Channel 接收数据

1)如果等待发送的队列 sendq 里存在挂起的 Goroutine,那么有两种情况:当前 Channel 无缓冲区,或者当前 Channel 已满。从 sendq 中取出最先阻塞的 Goroutine,然后调用 recv 方法,此时需做如下判断:


  1. 如果无缓冲区,那么直接从 sendq 接收数据;

  2. 如果缓冲区已满,从 buf 队列的头部接收数据,并把数据加到 buf 队列的尾部;

  3. 最后调用 goready 函数将等待发送数据的 Goroutine 的状态从_Gwaiting 置为_Grunnable,等待下一次调度。当缓冲区已满时的处理过程。



2)如果缓冲区 buf 中还有元素,那么就走正常的接收,将从 buf 中取出的元素拷贝到当前协程的接收数据目标内存地址中。值得注意的是,即使此时 Channel 已经关闭,仍然可以正常地从缓冲区 buf 中接收数据。3)如果是阻塞模式,且当前没有数据可以接收,那么就需要将当前 Goroutine 打包成一个 sudog 加入到 Channel 的等待接收队列 recvq 中,将当前 Goroutine 的状态置为_Gwaiting,等待唤醒。



Channel 与 happens-before 关系

Channel happens-before 规则有 4 条。1)对一个元素的 send 操作 happens-before 对应的 receive 完成操作。


var c = make(chan int, 10) // buffered或者unbufferedvar a string
func f() { // a 的初始化 happens-before 往ch中发送数据 a = "hello, world" c <- 0}
func main() { go f() // 往ch发送数据 happens-before 从ch中读取出数据 <-c // 打印a的值 happens-after 第12行 // 打印a的结果值“hello world” print(a)}
复制代码


2)对 Channel 的 close 操作 happens-before receive 端的收到关闭通知操作。


var c = make(chan int, 10) // buffered或者unbufferedvar a string
func f() { // a 的初始化 happens-before close ch a = "hello, world" close(c)}
func main() { go f() // close ch happens-before 从ch中读取出数据 <-c // 打印a的值 happens-after 第12行 // 打印a的结果值“hello world” print(a)}
复制代码


3)对于 Unbuffered Channel,对一个元素的 receive 操作 happens-before 对应的 send 完成操作。


var c = make(chan int) // unbufferedvar a string
func f() { // a 的初始化 happens-before 从ch中读取出数据 a = "hello, world" <-c}
func main() { go f() // 从ch中读取出数据 happens-before 往ch发送数据 c <- 0 // 打印a的值 happens after 第12行 // 打印a的结果值“hello world” print(a)}
复制代码


4)如果 Channel 的容量是 c(c>0),那么,第 n 个 receive 操作 happens-before 第 n+c 个 send 的完成操作。规则 3 是规则 4 c=0 时的特例。


Channel 使用场景

1)并发控制:通过控制带缓冲的 Channel 的队列大小来限制并发的数量。


func worker(id int, sem chan struct{}) {  // 获取许可  sem <- struct{}{}  time.Sleep(time.Second) // 模拟耗时操作  // 释放许可  <-sem}
func main() { // 创建一个缓冲区为2的Channel sem := make(chan struct{}, 2)
for i := 0; i < 5; i++ { go worker(i, sem) }}
复制代码


2)信号通知:使用一个无缓冲的 Channel 来通知一个 Goroutine 任务已经完成。


func main() {  done := make(chan bool)
go func() { time.Sleep(2 * time.Second) // 模拟耗时操作 // 发送信号表示工作已完成 done <- true }()
<-done // 等待信号}
复制代码


3)异步操作结果获取:在一个 Goroutine 中执行异步操作,然后通过 Channel 将结果发送到另一个 Goroutine。


func asyncTask() <-chan int {  ch := make(chan int)  go func() {    // 模拟异步操作    time.Sleep(2 * time.Second)    ch <- 1 // 发送结果    close(ch)  }()  return ch}
func main() { ch := asyncTask() time.Sleep(1 * time.Second) // 模拟其他操作 result := <-ch // 获取异步操作的结果}
复制代码


总结:控制与编排,殊途同归

Java 与 Golang 在并发模型上的差异,深刻地体现了两种构建程序确定性的不同哲学:1)Java (共享内存):采用显式同步的路径。它为开发者提供了强大的底层控制能力(锁、内存屏障),但要求开发者必须承担起预见并管理资源竞态的心智负担。确定性来自于对临界区和内存可见性的严格手工控制。2)Golang (消息传递):采用隐式因果的路径。它通过 Channel 将数据的所有权在 Goroutine 间传递,将并发问题从“共享数据访问”转化为“数据流设计”。确定性来自于消息传递建立的自然因果顺序,从而在结构上规避了竞态。Java 的路径是“先有并发,后加约束”,而 Golang 的路径是“通过约束,实现并发”。两者并非优劣之分,而是针对不同问题域和开发哲学的选择。Java 的完备工具集赋予了处理极端复杂场景的灵活性,而 Golang 的简约设计则为构建清晰、可靠、易于推理的并发系统提供了优雅的范式。最终,无论是显式的同步约束,还是隐式的因果传递,它们都通向并发编程的圣杯——在多核时代,构建出可预测、可维护且高性能的软件系统。这两种思想的碰撞与融合,正持续推动着现代并发编程的演进。


很高兴与你相遇!如果你喜欢本文内容,记得关注哦

发布于: 刚刚阅读数: 5
用户头像

poemyang

关注

让世界知道我的存在 2018-03-05 加入

技术/人文, 互联网

评论

发布
暂无评论
Goroutine间的“灵魂管道”:Channel如何实现数据同步与因果传递?_golang_poemyang_InfoQ写作社区