写点什么

GO database/sql 连接池源码分析

用户头像
Jack Zheng
关注
发布于: 2021 年 06 月 23 日

本文档主要针对 Go 1.14.12 版本的 database/sql/sql.go 的连接池做源码分析,通过这个篇文档能够了解 Go 在数据库连接池的一个设计逻辑,重要的流程以及在代码中一些值得借鉴模仿的逻辑及风格。主要内容如下:

  • database 包目录结构介绍

  • 数据库连接池的定义

  • 为什么要连接池

  • 主要核心的数据结构及解释

  • 重要方法的流程梳理及源码分析

  • 初始化数据库

  • 获取连接

  • 释放连接

  • 清理连接

  • Go 1.16.x 的优化


database 包目录结构

- convert.go      // rows的scan|-- ctxutil.go      // 判断ctx,然后执行prepare/exec/query/close等操作|-- driver        |  |-- driver.go    // driver定义了实现数据库驱动所需的接口,这些接口由sql包和具体的驱动包来实现|   -- types.go     // 数据类型的别名和转换-- sql.go           // 关于SQL数据库一些通用形接口和类型,包含:连接池、数据类型、连接、事务、statment、查询结果
复制代码


sql 包其实是 db 的抽象,实际连接查询是 db 驱动包来完成

数据库连接池的定义

连接池是一个由客户端维护,包含一定数据库连接缓存的池子,以便将来有连接数据库的请求的时候可以直接复用连接,目的是降低频繁创建和关闭连接的开销。

为什么要连接池?

通过一个 benchmark 来演示,为什么需要连接池


func BenchmarkConnectMySQL(b *testing.B) {  db, err := ConnectMySQL("root:zxm6655@tcp(127.0.0.1:3306)/test")  if err != nil {    b.Fatalf("mysql connect error : %s", err.Error())    return  }
ctx := context.Background()
db.SetMaxIdleConns(0) b.ResetTimer()
b.Run("noConnPool", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { _, _ = db.ExecContext(ctx, "SELECT 1") } })
db.SetMaxIdleConns(1) b.ResetTimer()
b.Run("hasConnPool", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { _, _ = db.ExecContext(ctx, "SELECT 1") } })}
复制代码



可以明显的看到有设置连接池和没有设置连接池的区别,有设置连接池的 benchmark 单次请求的速度比没有设置连接池的来得快很多,分配的内存也少。每次请求数据库都是 TCP 连接,如果不设置连接池,相当于要建立大量的 TCP 连接,这是很耗时的。




既然我们觉得连接池是重要且有必要的,那最核心的问题就在于如何设计连接池,其中最主要的就是怎么取连接,怎么复用连接,对过期的连接怎么处理,以及怎么知道这个连接池设计是好的。

接下来,就先从连接池最主要的数据结构 DB 入手,看下里面有什么,从而可以来猜想连接池是如何设计的。


DB 结构体

// L402type DB struct {    // 等待新的连接所需要的总时间,用于统计来判断这个连接池设置的好不好  waitDuration int64 
connector driver.Connector // 由数据库驱动实现的connector numClosed uint64 // 关闭的连接数
mu sync.Mutex // protects following fields freeConn []*driverConn // 连接池 connRequests map[uint64]chan connRequest // 连接请求的自增key nextRequest uint64 // 下一个连接的key numOpen int // 打开的和正在打开的连接数 openerCh chan struct{} // channel用于通知connectorOpenner()建立新的连接 closed bool // 当前数据库是否关闭 dep map[finalCloser]depSet lastPut map[*driverConn]string maxIdle int // 连接池大小,为0表示用默认的2,小于0表示不使用连接池 maxOpen int // 最大打开的连接数,包含在连接池中的闲散连接,小于等于0表示不限制 maxLifetime time.Duration // 一个连接在连接池中最长的生存时间,0表示可以一直重用 // channel用于通知connectorCleaner()清理连接,没有设置maxLifeTime,这个方法基本使用不到的 cleanerCh chan struct{} waitCount int64 // 等待的连接数,如果maxIdelCount为0,waitCount就是一直为0 maxIdleClosed int64 // 释放连接时,因为连接池已满而被关闭的连接数 maxLifetimeClosed int64 // 连接超过生存时间后而被关闭的连接数
stop func() // stop cancels the connection opener and the session resetter.}
复制代码


如下图,进一步地解释数据结构中的一些参数。


maxIdle:连接池的大小,也就是 freeConn 的大小,这个大小在初始化之后就是固定的。设置得越大,可能会有很多空闲的连接,占用空间。设置得太小也有可能导致很多阻塞的请求在等连接;


maxOpen:最大连接数指的是正在用的连接(下面数据结构这么画而已,实际不是数组)数量加上在连接池中的连接数量。这个设置和 maxIdle 有类似的意思;


numOpen:总连接数,指的是正在用的+在连接池里的+将要打开的。将要打开的是不确定的,因为很有可能在创建的过程中,创建失败了(numOpen 像乐观锁一样的一个逻辑统计);


waitCount:请求等待的总量,指的就是 connRequests 这个 map 里面的 kv 总数;


maxLifeTime:连接总生存时间,总生存时间从 create 到最后 close,中间在 inUse 或者再 idle 都计在总生存时间里,不会因为没有人用而停止。



通过 DB 数据结构我们能看到什么呢?


首先,数据结构里面锁的位置可以代表哪些字段是要加锁,哪些是不需要的,在可读性上大大提高;


其次,设计数据结构的时候一定有个主体,在设计主体的数据结构时,就可根据业务属性去决定该怎样地去增删改查等动作,为之后的设计铺个路;


接着,管道通信。如果是之前自己写代码,更多的是通过返回值来设计逻辑,但是其实 go 本身更推荐用管道通信。管道通信能够让自己的代码大大地解耦,更好地去做自己主要的设计逻辑,也不会显得太笨重;


然后,限制与配置,这一点可以认为是在给自己设计的数据结构一个边界的考量。通过这种思维,可以让自己看到自己设计的数据结构的一些可能与不可能;


最后,就是统计与分析,统计其实是在开发过程中感觉会很容易忽略的一个事情,更多的是专注在业务逻辑开发。但是统计,往往在生产过程也好,测试环境,压测环境也好,一定会给你带来很大的帮助。尤其在有限制的情况下,能够很清楚地知道如何去评判自己设计的数据结构的好坏。




driverConn 结构体

//L454type driverConn struct {    db        *DB  createdAt time.Time
sync.Mutex // guards following ci driver.Conn needReset bool // The connection session should be reset before use if true. closed bool // 确定连接最终都关闭后才是最终关闭 finalClosed bool // 确定依赖都被关闭后,才会执行最后的关闭 openStmt map[*driverStmt]bool
// guarded by db.mu inUse bool onPut []func() // code (with db.mu held) run when conn is next returned dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked}
复制代码



重要 function 及流程



如上两个图,一个是 gif 图,另一个是总的流程图,这张图包含了获取连接所有的重要且相关的逻辑函数,接下来跟着流程逐步地分析一下涉及到的方法。

初始化 db

// L732func OpenDB(c driver.Connector) *DB {  ctx, cancel := context.WithCancel(context.Background())  db := &DB{    connector:    c,    openerCh:     make(chan struct{}, connectionRequestQueueSize),    lastPut:      make(map[*driverConn]string),    connRequests: make(map[uint64]chan connRequest),    stop:         cancel,  }
go db.connectionOpener(ctx)
return db}
复制代码


// L1068func (db *DB) connectionOpener(ctx context.Context) {  for {    select {    case <-ctx.Done():      return    case <-db.openerCh:      db.openNewConnection(ctx)    }  }}
复制代码


初始化 db 后,会开一个协程来等待,通过 openerCh channel 当做信号来创建新的数据库连接。



获取连接

创建连接的主要函数有两个分别是:


func (db *DB) Conn(ctx context.Context) (*Conn, error)


通过 2 个策略来获取连接,cachedOrNewConn,从连接池内或者新建获取连接,alwaysNewConn,总是通过新建来获取连接。


// L1735func (db *DB) Conn(ctx context.Context) (*Conn, error) {  var dc *driverConn  var err error    // 尝试通过缓存或者新建(连接池里面没有可用连接)来获取连接,如果获取不到最大重试次数为2次  for i := 0; i < maxBadConnRetries; i++ {    dc, err = db.conn(ctx, cachedOrNewConn)    if err != driver.ErrBadConn {      break    }  }        // 如果在上一步通过缓存或者新建获取不到连接,则再通过新建获取一个连接  if err == driver.ErrBadConn {    dc, err = db.conn(ctx, alwaysNewConn)  }  if err != nil {    return nil, err  }
conn := &Conn{ db: db, dc: dc, } return conn, nil}
复制代码


func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)


这个方法会分为三种情况返回连接,分别是


  • 从连接池获取,复用连接

  • 连接池无可用连接或不允许使用连接池,且创建的连接有数量限制的,请求就会阻塞等待创建好的连接

  • 直接通过 connector 驱动去从数据库获取连接

从连接池里面直接获取
// L1149numFree := len(db.freeConn)if strategy == cachedOrNewConn && numFree > 0 {  conn := db.freeConn[0]  copy(db.freeConn, db.freeConn[1:])  db.freeConn = db.freeConn[:numFree-1]  conn.inUse = true  db.mu.Unlock()  if conn.expired(lifetime) {    conn.Close()    return nil, driver.ErrBadConn  }      // Reset the session if required.  if err := conn.resetSession(ctx); err == driver.ErrBadConn {    conn.Close()    return nil, driver.ErrBadConn  }      return conn, nil}
复制代码


如果策略是 cachedOrNewConn 的,一定会先去检查 freeConn 连接池里面有没有可以复用的连接,如果有取出来判断没有超时后返回可用的连接。在取的这个逻辑,L4~L6 是获取连接池第一个的连接的写法,这样的做法可以减少切片的容量-1,这样可以减少切片因为伸缩而产生的内存分配问题。

阻塞等待


在图中 L1172,就是先判断是否有限制最大连接数,当前总连接数是否超过最大连接数。因为有限制,所以此时的 req 只能阻塞,等待新的连接,写入 connRequests。


这边的 select 分为两种情况,一种是 ctx 超时,另一种是 req 获取到连接了。请求数据库连接的时候,通常会带上一个超时时间,避免一直等下去。ctx 超时之后,这个 reqKey 会从 connRequests 里面删除。删除的过程中,也会再检查了一下会不会刚好超时,刚好 req 收到连接的情况。如果有收到,那不会直接 close,相反的会留着用,放到连接池里面使用,除非连接池满了,或者不允许使用连接池。那另一种就是有信号写入管道,req 获取到了连接。

直接创建新连接

直接创建新的连接,就是通过 connector 去获取一个新的连接。这里要注意的点就是,虽然获取的过程中可能会失败,但是不是失败就失败了,而是会在错误里面再去调一次获取连接的函数。这里的思维就是尽可能的减少 connRequests 里面阻塞的请求,连接池设计不仅仅是要思考如何复用,更多的也要考虑如何减少阻塞的请求(当然可以通过配合来这个问题)。


// L1231db.numOpen++ // optimisticallydb.mu.Unlock()
ci, err := db.connector.Connect(ctx)if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err}
db.mu.Lock()
dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true,}
db.addDepLocked(dc, dc)db.mu.Unlock()return dc, nil
复制代码
可能需要创建新的连接

在前面的代码逻辑里面更多的是直接获取,但还强调了 maybeOpenNewConnections 这个方法。这个方法就是为了获取连接,更多的是给阻塞的请求创建连接的一个方法。里面主要的也就是往前面讲到的 openerCh 发信号,通知它要创建一个新的连接了,有某个请求可能需要了。


// L1050func (db *DB) maybeOpenNewConnections() {  numRequests := len(db.connRequests)  if db.maxOpen > 0 {    numCanOpen := db.maxOpen - db.numOpen    if numRequests > numCanOpen {      numRequests = numCanOpen    }  }  for numRequests > 0 {    db.numOpen++ // optimistically    numRequests--    if db.closed {      return    }    db.openerCh <- struct{}{}  }}
复制代码


在 db.openerCh 收到信号后,其实调取底层的方法就是 openNewConnection,获取连接的方式也是和前面讲的直接获取是一样的,通过 connector 去创建。但有两个点请注意,在这边创建失败后,再次调取 maybeOpenNewConnections,目的还是觉得有需要连接的阻塞的请求。另一个,获取到连接之后,会根据 putConnDBLocked 这个方法去决定这个连接是否被复用。如果复用则返回,如果无法复用就会关掉这个连接,不会一直放着不管的。


// L1081func (db *DB) openNewConnection(ctx context.Context) {  ci, err := db.connector.Connect(ctx)  db.mu.Lock()  defer db.mu.Unlock()  if db.closed {    if err == nil {      ci.Close()    }    db.numOpen--    return  }  if err != nil {    db.numOpen--    db.putConnDBLocked(nil, err)    db.maybeOpenNewConnections()    return  }  dc := &driverConn{    db:        db,    createdAt: nowFunc(),    ci:        ci,  }  if db.putConnDBLocked(dc, err) {    db.addDepLocked(dc, dc)  } else {    db.numOpen--    ci.Close()  }}
复制代码


接着来看一下 putConnDBLocked 如何决定传进来的连接是否可以复用。


首先,查看 DB 状态及是否有限制,如果关闭了或者达到限制了,那当然就无法复用了;


然后,看一下 connRequests 是否有阻塞等待的请求。如果有的话,通过 for-range 的形式直接 rand 出来一个,往这个 req 管道发送信号。这样,在前面阻塞等待的 req 就可以收到连接,返回回去。没有的话,就看下连接池是否可用,是否还可以往里面塞。可以的话就放到连接池里面,同时注意,会开启清扫连接池里面空闲连接的逻辑(这在后面会说到)。


// L1341func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {  if db.closed {    return false  }  if db.maxOpen > 0 && db.numOpen > db.maxOpen {    return false  }  if c := len(db.connRequests); c > 0 {    var req chan connRequest    var reqKey uint64    for reqKey, req = range db.connRequests {      break    }    delete(db.connRequests, reqKey) // Remove from pending requests.    if err == nil {      dc.inUse = true    }    req <- connRequest{      conn: dc,      err:  err,    }    return true  } else if err == nil && !db.closed {    if db.maxIdleConnsLocked() > len(db.freeConn) {      db.freeConn = append(db.freeConn, dc)      db.startCleanerLocked()      return true    }    db.maxIdleClosed++  }  return false}
复制代码


连接池获取连接就是这样的逻辑设计,复用池,阻塞等待,直接获取。



连接的释放


有获取连接就一定有释放连接,数据库连接的 close 主要涉及的函数就是 putConn。释放的行为一般是由 conn 发起的,或者数据库关闭的时候也会去关掉每个 conn。


type Closer interface {  Close() error}
复制代码


func (c *Conn) Close() error {  return c.close(nil)}
复制代码


// L1917func (c *Conn) close(err error) error {  if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {    return ErrConnDone  }
// Lock around releasing the driver connection // to ensure all queries have been stopped before doing so. c.closemu.Lock() defer c.closemu.Unlock()
c.dc.releaseConn(err) c.dc = nil c.db = nil return err}
复制代码


// L471func (dc *driverConn) releaseConn(err error) {  dc.db.putConn(dc, err, true)}
复制代码


这边就直接讲 putConn 这个方法,连接池的释放更多考虑的就是复用,大前提还是要考虑连接是否超过最大生存时间。然后看释放的连接是好的还是坏的,超时的连接其实就是一个坏的连接。这边注意下方代码块 L37,在收到一个坏的连接之后,再次调用 maybeOpenNewConnections,目的也和之前的一样。再往下,也是调用 putConnDBLocked,看是否这个连接可以复用。


// L1282func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {  if err != driver.ErrBadConn {    if !dc.validateConnection(resetSession) {      err = driver.ErrBadConn    }  }  db.mu.Lock()  if !dc.inUse {    db.mu.Unlock()    if debugGetPut {      fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])    }    panic("sql: connection returned that was never out")  }
if err != driver.ErrBadConn && dc.expired(db.maxLifetime) { db.maxLifetimeClosed++ err = driver.ErrBadConn } if debugGetPut { db.lastPut[dc] = stack() } dc.inUse = false dc.returnedAt = nowFunc()
for _, fn := range dc.onPut { fn() } dc.onPut = nil
if err == driver.ErrBadConn { db.maybeOpenNewConnections() db.mu.Unlock() dc.Close() return } if putConnHook != nil { putConnHook(db, dc) } added := db.putConnDBLocked(dc, nil) db.mu.Unlock()
if !added { dc.Close() return }}
复制代码


另外提一点,putConnHook 是测试时用的 hook,一般不对外开放,这个写法可以用来自己测试的时候方便使用。



清理无效连接


在 putConnDBLocked 方法里面讲到,如果放回连接池就会发起一次清理连接池空闲连接的动作,在重新设置连接最大生存时间的时候也会触发一次。可以注意到,清理无效连接不是一初始化数据库就开一个协程等超时清理(如果是我之前就会这么干)。虽然是之后会每过一段时间会清理一次,但数据库初始化完之后不会直接挂一个清理连接的协程。


// L953func (db *DB) startCleanerLocked() {  if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {    db.cleanerCh = make(chan struct{}, 1)    go db.connectionCleaner(db.maxLifetime)  }}
复制代码


清理连接这个方法,里面主要的一个就是先获取超时的所有连接,即下方代码块的变量 closing。在都获取完之后,才会去一个一个 close,而不是一个超时就关掉一个 conn。


// L960func (db *DB) connectionCleaner(d time.Duration) {  const minInterval = time.Second
if d < minInterval { d = minInterval } t := time.NewTimer(d)
for { select { case <-t.C: case <-db.cleanerCh: // maxLifetime was changed or db was closed. }
db.mu.Lock() d = db.maxLifetime if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return }
expiredSince := nowFunc().Add(-d) var closing []*driverConn for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { closing = append(closing, c) last := len(db.freeConn) - 1 db.freeConn[i] = db.freeConn[last] db.freeConn[last] = nil db.freeConn = db.freeConn[:last] i-- } } db.maxLifetimeClosed += int64(len(closing)) db.mu.Unlock()
for _, c := range closing { c.Close() }
if d < minInterval { d = minInterval } t.Reset(d) }}
复制代码



Go 1.16.x 优化

Go 1.16.x 的优化在下面这个 gif 可以明显地看到,相较于之前的版本增加了一个 maxIdleTime,主要是不想让连接在连接池中等待太久。虽然可以一直在连接池中等着下次复用,但毕竟占的是资源,一旦创建了很多,最大生存时间又很长,是很占内存的。因此,Go 1.16.x 做了这个优化。那相应的在清理连接的方法里,就多了一个查看连接是否有超过 maxaIdleTime 的计算。





总结

连接池的主要内容其实就是获取连接,释放连接,复用连接,清理连接。在整个 4 个主要流程中,反复考虑的就是要为潜在需要连接的请求创建连接,尽量减少阻塞的请求。同时,尽量去回收连接,不要都是从数据库去创建连接。


因为我没有实际的高并发的生产经验,所以没有连接池的调优能力,只能说通过源码分析看到学习写代码的一些规范及优点,然后还有就是解决问题的思路,在其他的池的设计也可以借鉴这里的设计思路去举一反三。

用户头像

Jack Zheng

关注

no pain no gain 2019.04.10 加入

用我的勤奋,一点一点地努力,提升自己的能力,拓展自己的视野,提高自己的认知。

评论

发布
暂无评论
GO database/sql 连接池源码分析