netpoll 是字节跳动开发的基于 epoll 的自研网络库。关于 netpoll 的原理与设计请参考字节跳动官方公众号发布的技术文章https://mp.weixin.qq.com/s/wSaJYg-HqnYY4SdLA2Zzaw。 本文主要结合代码和这片文章深入探讨一下 netpoll 的底层细节。
首先我们先来研究一下几个 netpoll 依赖的底层组件。
1. gopool
顾名思义,就是 go 的 goroutine 池,这是一个常见组件,不论使用 golang 标准库里面的 blocking io net,还是使用其他类 netpoll 的库(如 gnet),使用 goroutine 都能够减少 goroutine 的数量,使得在高并发的场景下性能得到提升。在 netpoll 里面,只有在我们通过 netpoll 底层拿到 connection 的数据,开始调用 handler 回调函数的时候,才使用了一下。gopool.CtxGo(c.ctx, task)
这行代码里面的 task 就是对 handler 的封装。从而可知, 运行我们的上层业务代码的 goroutine 是从 gopool 拿的 goroutine,而不是使用 go 关键字创建一个新的 goroutine。
connection_onevent.go:84
// onRequest is also responsible for executing the callbacks after the connection has been closed.
func (c *connection) onRequest() (err error) {
var process = c.process.Load()
if process == nil {
return nil
}
// Buffer has been fully processed, or task already exists
if !c.lock(processing) {
return nil
}
// add new task
var task = func() {
if c.ctx == nil {
c.ctx = context.Background()
}
var handler = process.(OnRequest)
START:
// NOTE: loop processing, which is useful for streaming.
for c.Reader().Len() > 0 && c.IsActive() {
// Single request processing, blocking allowed.
handler(c.ctx, c)
}
// Handling callback if connection has been closed.
if !c.IsActive() {
c.closeCallback(false)
return
}
// Double check when exiting.
c.unlock(processing)
if c.Reader().Len() > 0 {
if !c.lock(processing) {
return
}
goto START
}
}
gopool.CtxGo(c.ctx, task)
return nil
}
复制代码
回到 gopool 的内部设计,主要有两部分组成。
task 就是上面代码的 task,主要定义要执行的任务是什么。
worker 就是真正执行 task 的 goroutine。
task 和 worker 都使用了 sync.Pool 来复用对象从而减少内存的 malloc 操作,真可谓优化到极致。所有的 task 存储在一个单向链表中。并且 taskLock 来进行锁操作,这也是唯一一个锁。其他诸如 taskCount 等都是使用原子操作来代替锁的。worker 从链表中拿取 task 进行执行。总体还算常规操作。
type pool struct {
// The name of the pool
name string
// capacity of the pool, the maximum number of goroutines that are actually working
cap int32
// Configuration information
config *Config
// linked list of tasks
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// Record the number of running workers
workerCount int32
// This method will be called when the worker panic
panicHandler func(context.Context, interface{})
}
func (p *pool) CtxGo(ctx context.Context, f func()) {
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// The following two conditions are met:
// 1. the number of tasks is greater than the threshold.
// 2. The current number of workers is less than the upper limit p.cap.
// or there are currently no workers.
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
}
复制代码
2. mcache
mcache 在 nocopy buffer 中使用。当我们需要使用大小介于 1k 到 8MB 之间的时候,我们才会使用 mcache 来进行内存分配。否则仍然使用 golang 的 make 来分配。之所以要介于 1k 到 8MB,是因为 machche 同样使用的是 sync.Pool 技术,这个段的大小是常见的大小,在这个大小内 mcache 可以极大的提高对象复用的可能性。
// mallocMax is 8MB
const mallocMax = block8k * block1k
// Next implements Reader.
func (b *LinkBuffer) Next(n int) (p []byte, err error) {
...
if block1k < n && n <= mallocMax {
p = malloc(n, n)
b.caches = append(b.caches, p)
} else {
p = make([]byte, n)
}
...
}
// malloc limits the cap of the buffer from mcache.
func malloc(size, capacity int) []byte {
if capacity > mallocMax {
return make([]byte, size, capacity)
}
return mcache.Malloc(size, capacity)
}
复制代码
内部实现来说,mcache 并不复杂,就是新建了各个大小段的 sync.Pool。
比如我们要创建 24byte 的数组,那么我们就从存储着 32 byte 的 slice 的 sync.Pool 中拿出来一个来复用就好啦。省的我们创建新的 slice 了。当然如果 sync.Pool 中没有可复用的对象,仍然会创建一个新的对象来使用。
3. nocopy linkbuffer
这个是专门给 netpoll 开发的,所以代码就在 netpoll 里面。linkbuffer 在官方文章中讲解的比较详细,在此就不再系统讲其设计了。主要结合代码,回答读写无锁是如何做到的呢?我们知道 tcp 读写的都是字节流, 那么我们写了多少数据,目前读了多少数据。两者的差值就是待读取的数据的长度。在 netpoll linkbuffer 中,我们存储了待读取的数据的长度,并且通过原子操作进行操作。那么如果我们想读的数据的长度超出了这个长度的话,肯定是要报错的。也就是读指针永远也不会追上写指针,所以可以同时进行读写。
4. netpoll 整体设计
在官方文章中, netpoll 的设计讲解的比较学术化,和代码联系并不是很紧密。在此,我们从代码角度来看看 netpoll 的设计。
整体来看, eventLoop 会初始化 server,并且把 server.ln.(FD)通过 epoll_ctl 函数注册如 poll 的监听列表中,poll 是通过 pollmanager 来管理的,具体注册到哪个 poll 里面根据 load banlance 算法通过 Pick 函数来定。 然后当有新的连接建立的时候, epoll_wait 会 trigger server.OnRead 函数来 accept,accept 后新创建的 connection.(FD)再次被扔到 poll 的监听列表中,这样当操作系统监听到 connection.(FD)有数据写入的时候,就可以及时的通过 epoll_wait 来调用到 connection 的 inputs 函数写入到 connection.inputbuffer. 最后上层通过读取 connection.inputbuffer 来完成数据传递到业务代码。
type eventLoop struct {
sync.Mutex
opt *options
prepare OnPrepare
svr *server
stop chan error
}
type server struct {
operator FDOperator
ln Listener
prepare OnPrepare
quit func(err error)
connections sync.Map // key=fd, value=connection
}
// Run this server.
func (s *server) Run() (err error) {
s.operator = FDOperator{
FD: s.ln.Fd(),
OnRead: s.OnRead,
OnHup: s.OnHup,
}
s.operator.poll = pollmanager.Pick()
err = s.operator.Control(PollReadable)
if err != nil {
s.quit(err)
}
return err
}
// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept()
if err != nil {
// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.quit(err)
return err
}
log.Println("accept conn failed:", err.Error())
return err
}
if conn == nil {
return nil
}
// store & register connection
var connection = &connection{}
connection.init(conn.(Conn), s.prepare)
if !connection.IsActive() {
return nil
}
var fd = conn.(Conn).Fd()
connection.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd)
return nil
})
s.connections.Store(fd, connection)
return nil
}
// connection is the implement of Connection
type connection struct {
netFD
onEvent
locker
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan int
waitReadSize int32
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
inputBarrier *barrier
outputBarrier *barrier
supportZeroCopy bool
}
复制代码
总结
netpoll 的设计处处体现了各种 golang 的性能优化技巧,很能值得我们学习借鉴。
评论