一次 goroutine 泄漏排查案例
背景
这是一个比较经典的 golang 协程泄漏案例。
背景是这样,今天看到监控大盘数据发现协程的数量监控很奇怪。呈现上升趋势,然后骤降。虽然对协程数量做了报警机制,但是协程数量还是没有达到报警阈值,所以没有报警产生。
不过有经验的开发应该应该能一眼看出,这个肯定是协程泄漏了,因为协程数量一直在上涨,没有下降趋势,,中间下降的曲线其实是服务器重启造成的。
pprof 分析
为了直接确认是哪里导致的协程泄漏,用 golang 的 pprof 工具去对协程数量比较多的堆栈进行排查,关于 golang pprof 的使用以及统计原理可以看我的这个系列golang pprof 的使用。
以下是采样到的 goroutine 的 profile 文件。
可以发现主要是 transport.go 这个文件里产生的协程没有被释放,transport.go 这个文件是 golang 里用于发起 http 请求的文件,并且定位到了具体的协程泄漏代码位置 是 writeloop 和 readloop 函数。
熟悉 golang 的同学应该能立马想到,协程没有释放的原因极大可能是请求的响应体没有关闭。这也算是 golang 里面的一个坑了。
在分析之前,还是先说下结论,resp.Body 在被完整读取时,即使不显示的进行关闭也不会造成协程泄漏,只有读取部分 resp.Body 时,不显示关闭才会引发协程泄漏问题。
现在我们还是 具体分析下为啥 resp body 不关闭,会造成协程泄漏。
请求发送与接收流程
我们先来看看 golang 里面是如何发送以及接收 http 请求的。下面这张图完整的展示了一个请求被发送以及其响应被接收的过程,我们基于它然后结合代码分析下。
如图所示,在我们用 http.Get 方法发送请求时,底层追踪下去,会调用到 roundtrip 函数进行请求的发送与响应的接收。roundtrip 位置在源码的位置如下,代码基于 golang1.17 版本进行分析,
// src/net/http/transport.go:2528
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error)
复制代码
在代码里,用 persistConn 这个结构体代表了一个 http 连接,这个连接可以从连接池中获取,也可以被新建。
// src/net/http/transport.go:1869 reqch 和writech 都是连接的属性
type persistConn struct {
.....
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
...
}
复制代码
在 roundtrip 函数中,会往 persistConn 的 writech 和 reqch 两个 chan 通道内发送数据。代码如下:
// src/net/http/transport.go:2528
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
....
// src/net/http/transport.go:2594
pc.writech <- writeRequest{req, writeErrCh, continueCh}
...
// src/net/http/transport.go:2598
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
}
复制代码
请求发送过程
writech 通道和请求的发送有关,通道里的请求真正发送到网卡则是由 persistConn 的 writeloop 方法完成的。
persistConn 的 writeloop 方法是连接被 dialConn 方法创建的时候,就会用一个协程去调度执行的方法。代码如下:
// src/net/http/transport.go:1560
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
.... 省略了部分代码
// src/net/http/transport.go:1747
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
复制代码
在 pconn.writeLoop 里,会不断的轮询 persistConn 的 writech 通道里的消息,然后通过 wr.req.Request.write 发送到互联网中。
// src/net/http/transport.go:2383
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
.... 省略部分代码
}
复制代码
知道请求时如何发送出去的了,那么连接 persistConn 是如何接收请求的响应呢?
响应接收的流程
我们再回到 roundtrip 函数逻辑里,除了赋值 persistConn 的 writech 属性值,roundtrip 函数还会为 persistConn 的 reqch 属性赋值,persistConn 在被创建时,同样会启动一个协程去调度执行一个叫做 readloop 的方法。代码其实已经在上面展示过了,不过为了方便看,我在此处再列举一次,
// src/net/http/transport.go:2528
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
.... 省略部分代码
// src/net/http/transport.go:2598
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
.... 省略部分代码
}
// src/net/http/transport.go:1560
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
.... 省略了部分代码
// src/net/http/transport.go:1747
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
复制代码
readloop 方法会 读取 persistConn 读缓冲区中的数据,读到后就将响应信息放到 reqch 通道里,最终 reqch 通道里的响应信息就能被 roundtrip 函数获取到然后返回给应用层代码了。
readloop 读取缓冲区数据大致流程如下:
// src/net/http/transport.go:2052
func (pc *persistConn) readLoop() {
.... 省略部分代码
for alive {
... 省略部分代码
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
if err == nil {
// 读取响应
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
......
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
.......
select {
// rc 是pc.reqch的引用,这里将响应结果传递给了这个通道
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// 阻塞等待响应信息被读取完毕或者应用层关闭resp.Body
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
}
}
复制代码
readloop 通过 pc.readResponse 读取一次 http 响应后,会将响应体发送到 pc.reqch ,roundtrip 函数阻塞等待 pc.reqch 里有数据到达后,则将 pc.reqch 里的响应体取出来返回给应用层代码。
注意 readloop 函数在读取一次响应后,会阻塞等待响应体被读取完毕,或者响应体被 Close 掉后,才会将 persistConn 重新放回连接池,然后等待读下一个 http 的响应体。 应用层会调用 resp.Body 的 Close 方法,从 readloop 源码可以看出,resp.body 实际是个 bodyEOFSignal 类型,bodyEOFSignal 的 Close 方法如下:
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
....省略部分代码
n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
}
func (es *bodyEOFSignal) Close() error {
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}
err := es.body.Close()
return es.condfn(err)
}
// caller must hold es.mu.
func (es *bodyEOFSignal) condfn(err error) error {
if es.fn == nil {
return err
}
err = es.fn(err)
es.fn = nil
return err
}
复制代码
调用 bodyEOFSignal.Close 方法最终会调到 bodyEOFSignal 的 fn 方法或者 earlyCloseFn 方法,earlyCloseFn 在 Close 响应体的时候,发现响应体还没有被完全读取时会被调用。
调用 bodyEOFSignal.Read 方法时,当 read 读取完毕后 err 将会是 io.EOF,此时 err 不为空将会调用 condfn 方法对 fn 方法进行调用。
fn,earlyCloseFn 函数是在哪里声明的呢?还记得 readloop 源码里 bodyEOFSignal 的声明吗,我这里再展示一下上述的源码部分:
// src/net/http/transport.go:2166
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
复制代码
声明响应体 body 的时候就定义好了者两个函数,这两个函数都是往 waitForBodyRead 通道发送消息,readloop 会阻塞等待 waitForBodyRead 的消息到达。消息到达后说明 resp.Body 被读取完毕或者主动关闭了,然后调用 tryPutIdleConn 将连接重新放回连接池中 完整的代码还是在上述 readloop 的源码片段里,我这里只展示下 readloop 部分代码。
// src/net/http/transport.go:2207
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
// tryPutIdeConn 将连接重新放入连接池
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
复制代码
现在再来看我们 go 协程泄漏的代码在那里,是在 readloop 和 writelooop 函数中,泄漏的原因就在于读取响应体后没有对响应体将进行显示的关闭或者没有把响应体的内容读取完毕,导致没有向 waitForBodyRead 通道发送消息,而执行的 readloop 函数的协程一直阻塞等待 waitForBodyRead 消息的到达,后续的请求又新建了连接,从而新起了 readloop 协程,writeloop 协程,同样由于响应体未关闭也阻塞在这里,导致协程数量越来越多,从而有协程泄漏的现象。
一般情况下,我们都会完整的读取完 resp.Body,所以即使不显示的关闭 body,也不会有泄漏问题产生,但我们的程序刚好有段逻辑需要只需要读取 body 的前 10 字节,代码如下:
_, err = ioutil.ReadAll(io.LimitReader(resp.Body, 10))
if err != nil && err != io.EOF {
t.Fatal(err)
}
复制代码
读取完后也没有关闭 resp.Body 并且类似的请求越来越多,导致我们的协程数量越来越多了。
修复这个 bug 也很简单,即对 resp body 关闭即可。
反思
golang resp body 还是一定要记得关闭,不然就会引发协程泄漏问题,这次由于同事对此类问题没有过多重视,导致了这个问题,好在有监控大盘,及时发现,不然后果不堪设想。
评论