本文深入探讨了高性能网络框架 nbio 在 Golang 中的应用,包括其架构、配置、事件处理机制、核心组件等,并与 Evio 做了比较。原文: Analyzing High-Performance Network Framework nbio in Go
前言
nbio 项目还包括建立在 nbio 基础上的 nbhttp,但这不在我们的讨论范围之内。
与 evio 一样,nbio 也采用经典的 Reactor 模式。事实上,Go 中的许多异步网络框架都是基于这种模式设计的。
我们先看看如何执行 nbio 代码。
服务器:
package main
import ( "fmt" "github.com/lesismal/nbio")
func main() { g := nbio.NewGopher(nbio.Config{ Network: "tcp", Addrs: []string{":8888"}, MaxWriteBufferSize: 6 * 1024 * 1024, })
g.OnData(func(c *nbio.Conn, data []byte) { c.Write(append([]byte{}, data...)) })
err := g.Start() if err != nil { fmt.Printf("nbio.Start failed: %v\n", err) return }
defer g.Stop() g.Wait()}
复制代码
我们用 nbio.NewGopher() 函数创建新的引擎实例,通过 nbio.Config 结构来配置引擎实例,包括:
Network(网络):使用的网络类型,本例中为 "TCP"。
Addrs(地址):服务器应该监听的地址和端口,这里是":8888"(监听本地计算机的 8888 端口)。
MaxWriteBufferSize(最大写缓冲区大小):写缓冲区的最大大小,此处设置为 6MB。
我们还可以进一步探索其他配置。然后,我们通过引擎实例 g.OnData() 注册数据接收回调函数,该回调函数会在收到数据时调用。回调函数需要两个参数:连接对象 c 和接收到的数据 data。在回调函数中,通过 c.Write() 方法将接收到的数据写回客户端。
客户端:
package main
import ( "bytes" "context" "fmt" "math/rand" "time" "github.com/lesismal/nbio" "github.com/lesismal/nbio/logging")
func main() { var ( ret []byte buf = make([]byte, 1024*1024*4) addr = "localhost:8888" ctx, _ = context.WithTimeout(context.Background(), 60*time.Second) )
logging.SetLevel(logging.LevelInfo) rand.Read(buf)
g := nbio.NewGopher(nbio.Config{}) done := make(chan int)
g.OnData(func(c *nbio.Conn, data []byte) { ret = append(ret, data...) if len(ret) == len(buf) { if bytes.Equal(buf, ret) { close(done) } } })
err := g.Start() if err != nil { fmt.Printf("Start failed: %v\n", err) }
defer g.Stop()
c, err := nbio.Dial("tcp", addr) if err != nil { fmt.Printf("Dial failed: %v\n", err) }
g.AddConn(c) c.Write(buf)
select { case <-ctx.Done(): logging.Error("timeout") case <-done: logging.Info("success") }}
复制代码
乍一看似乎有点繁琐,实际上服务器和客户端共享同一套结构。
客户端通过 nbio.Dial 与服务器连接,连接成功后封装到 nbio.Conn 中。这里 nbio.Conn 实现了标准库中的 net.Conn 接口,最后通过 g.AddConn(c) 添加此连接,并向服务器写入数据。服务器收到数据后,其处理逻辑是将数据原封不动发送回客户端,客户端收到数据后,会触发 OnData 回调,该回调会检查收到的数据长度是否与发送的数据长度一致,如果一致,则关闭连接。
下面深入探讨几个关键结构。
type Engine struct { //... sync.WaitGroup //... mux sync.Mutex wgConn sync.WaitGroup network string addrs []string //... connsStd map[*Conn]struct{} connsUnix []*Conn listeners []*poller pollers []*poller onOpen func(c *Conn) onClose func(c *Conn, err error) onRead func(c *Conn) onData func(c *Conn, data []byte) onReadBufferAlloc func(c *Conn) []byte onReadBufferFree func(c *Conn, buffer []byte) //...}
复制代码
Engine 本质上是核心管理器,负责管理所有监听器、轮询器和工作轮询器。
这两种轮询器有什么区别?
区别在于责任不同。
监听轮询器只负责接受新连接。当一个新的客户端 conn 到达时,它会从 pollers 中选择一个工作轮询器,并将 conn 添加到相应的工作轮询器中。随后,工作轮询器负责处理该连接的读/写事件。
因此当我们启动程序时,如果只监听一个地址,程序中的轮询次数等于 1(监听器轮询器)+ pollerNum。
通过上述字段,可以自定义配置和回调。例如,可以在新连接到达时设置 onOpen 回调函数,或在数据到达时设置 onData 回调函数等。
type Conn struct { mux sync.Mutex p *poller fd int //... writeBuffer []byte //... DataHandler func(c *Conn, data []byte)}
复制代码
Conn 结构代表网络连接,每个 Conn 只属于一个轮询器。当数据一次写不完时,剩余数据会先存储在 writeBuffer 中,等待下一个可写事件继续写入。
type poller struct { g *Engine epfd int evtfd int index int shutdown bool listener net.Listener isListener bool unixSockAddr string ReadBuffer []byte pollType string}
复制代码
至于 poller 结构,这是一个抽象概念,用于管理底层多路复用 I/O 操作(如 Linux 的 epoll、Darwin 的 kqueue 等)。
注意 pollType,nbio 默认使用电平触发(LT)模式的 epoll,但用户也可以将其设置为边缘触发(ET)模式。
介绍完基本结构后,我们来看看代码流程。
当启动服务器代码时,调用 Start:
func (g *Engine) Start() error { //... switch g.network { // 第一部分: 初始化 listener case "unix", "tcp", "tcp4", "tcp6": for i := range g.addrs { ln, err := newPoller(g, true, i) if err != nil { for j := 0; j < i; j++ { g.listeners[j].stop() } return err } g.addrs[i] = ln.listener.Addr().String() g.listeners = append(g.listeners, ln) } //... // 第二部分: 初始化一定数量的轮询器 for i := 0; i < g.pollerNum; i++ { p, err := newPoller(g, false, i) if err != nil { for j := 0; j < len(g.listeners); j++ { g.listeners[j].stop() } for j := 0; j < i; j++ { g.pollers[j].stop() } return err } g.pollers[i] = p } //... // 第三部分: 启动所有工作轮询器 for i := 0; i < g.pollerNum; i++ { g.pollers[i].ReadBuffer = make([]byte, g.readBufferSize) g.Add(1) go g.pollers[i].start() } // 第四部分: 启动所有监听器 for _, l := range g.listeners { g.Add(1) go l.start() } //... (忽略 UDP) //...}
复制代码
代码比较容易理解,分为四个部分:
第一部分:初始化监听器
根据 g.network 值(如 "unix"、"tcp"、"tcp4"、"tcp6"),为每个要监听的地址创建一个新的轮询器。该轮询器主要管理监听套接字上的事件。如果在创建过程中发生错误,则停止所有先前创建的监听器并返回错误信息。
第二部分:初始化一定数量的轮询器
创建指定数量(pollerNum)的轮询器,用于处理已连接套接字上的读/写事件。如果在创建过程中发生错误,将停止所有监听器和之前创建的工作轮询器,然后返回错误信息。
第三部分:启动所有工作轮询器投票站
为每个轮询器分配读缓冲区并启动。
第四部分:启动所有监听器
启动之前创建的所有监听器,并开始监听各自地址上的连接请求。
关于轮询器的启动:
func (p *poller) start() { defer p.g.Done() //... if p.isListener { p.acceptorLoop() } else { defer func() { syscall.Close(p.epfd) syscall.Close(p.evtfd) }() p.readWriteLoop() }}
复制代码
分为两种情况。如果是监听轮询器:
func (p *poller) acceptorLoop() { // 如果不希望将当前 goroutine 调度到其他操作线程。 if p.g.lockListener { runtime.LockOSThread() defer runtime.UnlockOSThread() } p.shutdown = false for !p.shutdown { conn, err := p.listener.Accept() if err == nil { var c *Conn c, err = NBConn(conn) if err != nil { conn.Close() continue } // p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c) } else { var ne net.Error if ok := errors.As(err, &ne); ok && ne.Timeout() { logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index) time.Sleep(time.Second / 20) } else { if !p.shutdown { logging.Error("NBIO[%v][%v_%v] Accept failed: %v, exit...", p.g.Name, p.pollType, p.index, err) } break } } }}
复制代码
监听轮询器等待新连接的到来,并在接受后将其封装到 nbio.Conn 中,并将 Conn 添加到相应的工作轮询器中。
func (p *poller) addConn(c *Conn) { c.p = p if c.typ != ConnTypeUDPServer { p.g.onOpen(c) } fd := c.fd p.g.connsUnix[fd] = c err := p.addRead(fd) if err != nil { p.g.connsUnix[fd] = nil c.closeWithError(err) logging.Error("[%v] add read event failed: %v", c.fd, err) }}
复制代码
这里一个有趣的设计是对 conn 的管理。该结构是个切片,直接使用 conn 的 fd 作为索引。这样做的好处是:
最后,通过调用 addRead 将相应的 conn fd 添加到 epoll 中。
func (p *poller) addRead(fd int) error { switch p.g.epollMod { case EPOLLET: return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLET}) default: return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.E
pollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN}) }}
复制代码
这里不注册写事件是合理的,因为新连接上没有数据要发送。这种方法避免了一些不必要的系统调用,从而提高了程序性能。
如果启动的是工作轮询器,它的工作就是等待新增 conn 事件,并进行相应处理。
func (p *poller) readWriteLoop() { //... msec := -1 events := make([]syscall.EpollEvent, 1024) //... for !p.shutdown { n, err := syscall.EpollWait(p.epfd, events, msec) if err != nil && !errors.Is(err, syscall.EINTR) { return } if n <= 0 { msec = -1 continue } msec = 20 // 遍历事件 for _, ev := range events[:n] { fd := int(ev.Fd) switch fd { case p.evtfd: default: c := p.getConn(fd) if c != nil { if ev.Events&epollEventsError != 0 { c.closeWithError(io.EOF) continue } // 如果可写,则刷新数据 if ev.Events&epollEventsWrite != 0 { c.flush() } // 读取事件 if ev.Events&epollEventsRead != 0 { if p.g.onRead == nil { for i := 0; i < p.g.maxConnReadTimesPerEventLoop; i++ { buffer := p.g.borrow(c) rc, n, err := c.ReadAndGetConn(buffer) if n > 0 { p.g.onData(rc, buffer[:n]) } p.g.payback(c, buffer) //... if n < len(buffer) { break } } } else { p.g.onRead(c) } } } else { syscall.Close(fd) } } } }}
复制代码
这段代码也很简单,等待事件到来,遍历事件列表,并相应处理每个事件。
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)
复制代码
在 EpollWait 中,只有 msec 是用户可修改的。通常,我们设置 msec = -1 使函数阻塞,直到至少有一个事件发生;否则,函数将无限期阻塞。当事件较少时,这种方法非常有用,能最大限度减少 CPU 占用。
如果想尽快响应事件,可以设置 msec = 0,这样 EpollWait 就能立即返回,无需等待任何事件。在这种情况下,程序可能会更频繁调用 EpollWait,可以在事件发生后立即处理事件,从而提高 CPU 使用率。
如果程序可以容忍一定延迟,并且希望降低 CPU 占用率,可以将 msec 设置为正数。这样,EpollWait 就会在指定时间内等待事件发生。如果在这段时间内没有事件发生,函数将返回,可以选择稍后再次调用 EpollWait。这种方法可以降低 CPU 占用率,但可能导致响应时间延长。
nbio 会根据事件计数调整 msec 值。如果计数大于 0,则 msec 设置为 20。
字节跳动的 netpoll 代码与此类似;如果事件计数大于 0 ,则将 msec 设置为 0;如果事件计数小于或等于 0,则将 msec 设置为-1,然后调用 Gosched() 以主动退出当前 goroutine。
var msec = -1for { n, err = syscall.EpollWait(epfd, events, msec) if n <= 0 { msec = -1 runtime.Gosched() continue } msec = 0 ...}
复制代码
不过,nbio 中的自愿切换代码已被注释掉。根据作者的解释,最初他参考了字节跳动的方法,并添加了自愿切换功能。
不过,在对 nbio 进行性能测试时发现,添加或不添加自愿切换功能对性能并无明显影响,因此最终决定将其删除。
事件处理部分
如果是可读事件,则可以通过内置或自定义内存分配器获取相应的缓冲区,然后调用 ReadAndGetConn 读取数据,无需每次都分配缓冲区。
如果是可写事件,则会调用 flush 发送缓冲区中未发送的数据。
func (c *Conn) flush() error { //..... old := c.writeBuffer n, err := c.doWrite(old) if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) { //..... }
if n < 0 { n = 0 } left := len(old) - n // 描述尚未完成,因此将其余部分存储在writeBuffer中以备下次写入。 if left > 0 { if n > 0 { c.writeBuffer = mempool.Malloc(left) copy(c.writeBuffer, old[n:]) mempool.Free(old) } // c.modWrite() } else { mempool.Free(old) c.writeBuffer = nil if c.wTimer != nil { c.wTimer.Stop() c.wTimer = nil } // 解释完成后,首先将conn重置为仅读取事件。 c.resetRead() //... }
c.mux.Unlock() return nil}
复制代码
逻辑也很简单,有多少就写多少,如果写不完,就把剩余数据放回 writeBuffer,然后在 epollWait 触发时再次写入。
如果写入完成,则不再有数据要写入,将此连接的事件重置为读取事件。
主逻辑基本上就是这样。
等等,最初提到有新连接进入时,只注册了连接的读事件,并没有注册写事件。写事件是什么时候注册的?
当然是在调用 conn.Write 时注册的。
g := nbio.NewGopher(nbio.Config{ Network: "tcp", Addrs: []string{":8888"}, MaxWriteBufferSize: 6 * 1024 * 1024, })
g.OnData(func(c *nbio.Conn, data []byte) { c.Write(append([]byte{}, data...))})
复制代码
当 Conn 数据到达时,底层会在读取数据后回调 OnData 函数,此时可以调用 Write 向另一端发送数据。
g := nbio.NewGopher(nbio.Config{ Network: "tcp", Addrs: []string{":8888"}, MaxWriteBufferSize: 6 * 1024 * 1024, })
g.OnData(func(c *nbio.Conn, data []byte) { c.Write(append([]byte{}, data...))})
// 当数据到达conn时,底层将读取数据并回调OnData函数。此时,您可以调用Write来向另一端发送数据。func (c *Conn) Write(b []byte) (int, error) { //.... n, err := c.write(b) if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) { //..... return n, err }
if len(c.writeBuffer) == 0 { if c.wTimer != nil { c.wTimer.Stop() c.wTimer = nil } } else { //仍然有数据未写入,添加写事件。 c.modWrite() } //..... return n, err} func (c *Conn) write(b []byte) (int, error) { //... if len(c.writeBuffer) == 0 { n, err := c.doWrite(b) if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) { return n, err } //..... left := len(b) - n // 未完成,将剩余数据写入writeBuffer。 if left > 0 && c.typ == ConnTypeTCP { c.writeBuffer = mempool.Malloc(left) copy(c.writeBuffer, b[n:]) c.modWrite() } return len(b), nil } // 如果writeBuffer中仍有未写入的数据,则还将追加新数据。 c.writeBuffer = mempool.Append(c.writeBuffer, b...)
return len(b), nil}
复制代码
当数据未完全写入时,剩余数据将被放入 writeBuffer,触发执行 modWrite,并将 conn 的写入事件注册到 epoll。
总结
与 evio 相比,nbio 没有蜂群效应。
Evio 通过不断唤醒无效的 epoll 来实现逻辑正确性。Nbio 尽量减少系统调用,减少不必要的开销。
在可用性方面,nbio 实现了标准库 net.Conn,许多设置都是可配置的,允许用户进行高度灵活的定制。
预分配缓冲区用于读写操作,以提高应用程序性能。
总之,nbio 是个不错的高性能无阻塞网络框架。
你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!
评论