本文档主要针对 Go 1.14.12 版本的 database/sql/sql.go 的连接池做源码分析,通过这个篇文档能够了解 Go 在数据库连接池的一个设计逻辑,重要的流程以及在代码中一些值得借鉴模仿的逻辑及风格。主要内容如下:
database 包目录结构介绍
数据库连接池的定义
为什么要连接池
主要核心的数据结构及解释
重要方法的流程梳理及源码分析
初始化数据库
获取连接
释放连接
清理连接
Go 1.16.x 的优化
database 包目录结构
- convert.go // rows的scan
|-- ctxutil.go // 判断ctx,然后执行prepare/exec/query/close等操作
|-- driver
| |-- driver.go // driver定义了实现数据库驱动所需的接口,这些接口由sql包和具体的驱动包来实现
| -- types.go // 数据类型的别名和转换
-- sql.go // 关于SQL数据库一些通用形接口和类型,包含:连接池、数据类型、连接、事务、statment、查询结果
复制代码
sql 包其实是 db 的抽象,实际连接查询是 db 驱动包来完成
数据库连接池的定义
连接池是一个由客户端维护,包含一定数据库连接缓存的池子,以便将来有连接数据库的请求的时候可以直接复用连接,目的是降低频繁创建和关闭连接的开销。
为什么要连接池?
通过一个 benchmark 来演示,为什么需要连接池
func BenchmarkConnectMySQL(b *testing.B) {
db, err := ConnectMySQL("root:zxm6655@tcp(127.0.0.1:3306)/test")
if err != nil {
b.Fatalf("mysql connect error : %s", err.Error())
return
}
ctx := context.Background()
db.SetMaxIdleConns(0)
b.ResetTimer()
b.Run("noConnPool", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = db.ExecContext(ctx, "SELECT 1")
}
})
db.SetMaxIdleConns(1)
b.ResetTimer()
b.Run("hasConnPool", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = db.ExecContext(ctx, "SELECT 1")
}
})
}
复制代码
可以明显的看到有设置连接池和没有设置连接池的区别,有设置连接池的 benchmark 单次请求的速度比没有设置连接池的来得快很多,分配的内存也少。每次请求数据库都是 TCP 连接,如果不设置连接池,相当于要建立大量的 TCP 连接,这是很耗时的。
既然我们觉得连接池是重要且有必要的,那最核心的问题就在于如何设计连接池,其中最主要的就是怎么取连接,怎么复用连接,对过期的连接怎么处理,以及怎么知道这个连接池设计是好的。
接下来,就先从连接池最主要的数据结构 DB 入手,看下里面有什么,从而可以来猜想连接池是如何设计的。
DB 结构体
// L402
type DB struct {
// 等待新的连接所需要的总时间,用于统计来判断这个连接池设置的好不好
waitDuration int64
connector driver.Connector // 由数据库驱动实现的connector
numClosed uint64 // 关闭的连接数
mu sync.Mutex // protects following fields
freeConn []*driverConn // 连接池
connRequests map[uint64]chan connRequest // 连接请求的自增key
nextRequest uint64 // 下一个连接的key
numOpen int // 打开的和正在打开的连接数
openerCh chan struct{} // channel用于通知connectorOpenner()建立新的连接
closed bool // 当前数据库是否关闭
dep map[finalCloser]depSet
lastPut map[*driverConn]string
maxIdle int // 连接池大小,为0表示用默认的2,小于0表示不使用连接池
maxOpen int // 最大打开的连接数,包含在连接池中的闲散连接,小于等于0表示不限制
maxLifetime time.Duration // 一个连接在连接池中最长的生存时间,0表示可以一直重用
// channel用于通知connectorCleaner()清理连接,没有设置maxLifeTime,这个方法基本使用不到的
cleanerCh chan struct{}
waitCount int64 // 等待的连接数,如果maxIdelCount为0,waitCount就是一直为0
maxIdleClosed int64 // 释放连接时,因为连接池已满而被关闭的连接数
maxLifetimeClosed int64 // 连接超过生存时间后而被关闭的连接数
stop func() // stop cancels the connection opener and the session resetter.
}
复制代码
如下图,进一步地解释数据结构中的一些参数。
maxIdle:连接池的大小,也就是 freeConn 的大小,这个大小在初始化之后就是固定的。设置得越大,可能会有很多空闲的连接,占用空间。设置得太小也有可能导致很多阻塞的请求在等连接;
maxOpen:最大连接数指的是正在用的连接(下面数据结构这么画而已,实际不是数组)数量加上在连接池中的连接数量。这个设置和 maxIdle 有类似的意思;
numOpen:总连接数,指的是正在用的+在连接池里的+将要打开的。将要打开的是不确定的,因为很有可能在创建的过程中,创建失败了(numOpen 像乐观锁一样的一个逻辑统计);
waitCount:请求等待的总量,指的就是 connRequests 这个 map 里面的 kv 总数;
maxLifeTime:连接总生存时间,总生存时间从 create 到最后 close,中间在 inUse 或者再 idle 都计在总生存时间里,不会因为没有人用而停止。
通过 DB 数据结构我们能看到什么呢?
首先,数据结构里面锁的位置可以代表哪些字段是要加锁,哪些是不需要的,在可读性上大大提高;
其次,设计数据结构的时候一定有个主体,在设计主体的数据结构时,就可根据业务属性去决定该怎样地去增删改查等动作,为之后的设计铺个路;
接着,管道通信。如果是之前自己写代码,更多的是通过返回值来设计逻辑,但是其实 go 本身更推荐用管道通信。管道通信能够让自己的代码大大地解耦,更好地去做自己主要的设计逻辑,也不会显得太笨重;
然后,限制与配置,这一点可以认为是在给自己设计的数据结构一个边界的考量。通过这种思维,可以让自己看到自己设计的数据结构的一些可能与不可能;
最后,就是统计与分析,统计其实是在开发过程中感觉会很容易忽略的一个事情,更多的是专注在业务逻辑开发。但是统计,往往在生产过程也好,测试环境,压测环境也好,一定会给你带来很大的帮助。尤其在有限制的情况下,能够很清楚地知道如何去评判自己设计的数据结构的好坏。
driverConn 结构体
//L454
type driverConn struct {
db *DB
createdAt time.Time
sync.Mutex // guards following
ci driver.Conn
needReset bool // The connection session should be reset before use if true.
closed bool // 确定连接最终都关闭后才是最终关闭
finalClosed bool // 确定依赖都被关闭后,才会执行最后的关闭
openStmt map[*driverStmt]bool
// guarded by db.mu
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}
复制代码
重要 function 及流程
如上两个图,一个是 gif 图,另一个是总的流程图,这张图包含了获取连接所有的重要且相关的逻辑函数,接下来跟着流程逐步地分析一下涉及到的方法。
初始化 db
// L732
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
return db
}
复制代码
// L1068
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
复制代码
初始化 db 后,会开一个协程来等待,通过 openerCh channel 当做信号来创建新的数据库连接。
获取连接
创建连接的主要函数有两个分别是:
func (db *DB) Conn(ctx context.Context) (*Conn, error)
通过 2 个策略来获取连接,cachedOrNewConn,从连接池内或者新建获取连接,alwaysNewConn,总是通过新建来获取连接。
// L1735
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
var dc *driverConn
var err error
// 尝试通过缓存或者新建(连接池里面没有可用连接)来获取连接,如果获取不到最大重试次数为2次
for i := 0; i < maxBadConnRetries; i++ {
dc, err = db.conn(ctx, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
// 如果在上一步通过缓存或者新建获取不到连接,则再通过新建获取一个连接
if err == driver.ErrBadConn {
dc, err = db.conn(ctx, alwaysNewConn)
}
if err != nil {
return nil, err
}
conn := &Conn{
db: db,
dc: dc,
}
return conn, nil
}
复制代码
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
这个方法会分为三种情况返回连接,分别是
从连接池里面直接获取
// L1149
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Reset the session if required.
if err := conn.resetSession(ctx); err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
复制代码
如果策略是 cachedOrNewConn 的,一定会先去检查 freeConn 连接池里面有没有可以复用的连接,如果有取出来判断没有超时后返回可用的连接。在取的这个逻辑,L4~L6 是获取连接池第一个的连接的写法,这样的做法可以减少切片的容量-1,这样可以减少切片因为伸缩而产生的内存分配问题。
阻塞等待
在图中 L1172,就是先判断是否有限制最大连接数,当前总连接数是否超过最大连接数。因为有限制,所以此时的 req 只能阻塞,等待新的连接,写入 connRequests。
这边的 select 分为两种情况,一种是 ctx 超时,另一种是 req 获取到连接了。请求数据库连接的时候,通常会带上一个超时时间,避免一直等下去。ctx 超时之后,这个 reqKey 会从 connRequests 里面删除。删除的过程中,也会再检查了一下会不会刚好超时,刚好 req 收到连接的情况。如果有收到,那不会直接 close,相反的会留着用,放到连接池里面使用,除非连接池满了,或者不允许使用连接池。那另一种就是有信号写入管道,req 获取到了连接。
直接创建新连接
直接创建新的连接,就是通过 connector 去获取一个新的连接。这里要注意的点就是,虽然获取的过程中可能会失败,但是不是失败就失败了,而是会在错误里面再去调一次获取连接的函数。这里的思维就是尽可能的减少 connRequests 里面阻塞的请求,连接池设计不仅仅是要思考如何复用,更多的也要考虑如何减少阻塞的请求(当然可以通过配合来这个问题)。
// L1231
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
复制代码
可能需要创建新的连接
在前面的代码逻辑里面更多的是直接获取,但还强调了 maybeOpenNewConnections 这个方法。这个方法就是为了获取连接,更多的是给阻塞的请求创建连接的一个方法。里面主要的也就是往前面讲到的 openerCh 发信号,通知它要创建一个新的连接了,有某个请求可能需要了。
// L1050
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
db.openerCh <- struct{}{}
}
}
复制代码
在 db.openerCh 收到信号后,其实调取底层的方法就是 openNewConnection,获取连接的方式也是和前面讲的直接获取是一样的,通过 connector 去创建。但有两个点请注意,在这边创建失败后,再次调取 maybeOpenNewConnections,目的还是觉得有需要连接的阻塞的请求。另一个,获取到连接之后,会根据 putConnDBLocked 这个方法去决定这个连接是否被复用。如果复用则返回,如果无法复用就会关掉这个连接,不会一直放着不管的。
// L1081
func (db *DB) openNewConnection(ctx context.Context) {
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
复制代码
接着来看一下 putConnDBLocked 如何决定传进来的连接是否可以复用。
首先,查看 DB 状态及是否有限制,如果关闭了或者达到限制了,那当然就无法复用了;
然后,看一下 connRequests 是否有阻塞等待的请求。如果有的话,通过 for-range 的形式直接 rand 出来一个,往这个 req 管道发送信号。这样,在前面阻塞等待的 req 就可以收到连接,返回回去。没有的话,就看下连接池是否可用,是否还可以往里面塞。可以的话就放到连接池里面,同时注意,会开启清扫连接池里面空闲连接的逻辑(这在后面会说到)。
// L1341
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
复制代码
连接池获取连接就是这样的逻辑设计,复用池,阻塞等待,直接获取。
连接的释放
有获取连接就一定有释放连接,数据库连接的 close 主要涉及的函数就是 putConn。释放的行为一般是由 conn 发起的,或者数据库关闭的时候也会去关掉每个 conn。
type Closer interface {
Close() error
}
复制代码
func (c *Conn) Close() error {
return c.close(nil)
}
复制代码
// L1917
func (c *Conn) close(err error) error {
if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
return ErrConnDone
}
// Lock around releasing the driver connection
// to ensure all queries have been stopped before doing so.
c.closemu.Lock()
defer c.closemu.Unlock()
c.dc.releaseConn(err)
c.dc = nil
c.db = nil
return err
}
复制代码
// L471
func (dc *driverConn) releaseConn(err error) {
dc.db.putConn(dc, err, true)
}
复制代码
这边就直接讲 putConn 这个方法,连接池的释放更多考虑的就是复用,大前提还是要考虑连接是否超过最大生存时间。然后看释放的连接是好的还是坏的,超时的连接其实就是一个坏的连接。这边注意下方代码块 L37,在收到一个坏的连接之后,再次调用 maybeOpenNewConnections,目的也和之前的一样。再往下,也是调用 putConnDBLocked,看是否这个连接可以复用。
// L1282
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if err != driver.ErrBadConn {
if !dc.validateConnection(resetSession) {
err = driver.ErrBadConn
}
}
db.mu.Lock()
if !dc.inUse {
db.mu.Unlock()
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
db.maxLifetimeClosed++
err = driver.ErrBadConn
}
if debugGetPut {
db.lastPut[dc] = stack()
}
dc.inUse = false
dc.returnedAt = nowFunc()
for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil
if err == driver.ErrBadConn {
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
if putConnHook != nil {
putConnHook(db, dc)
}
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
if !added {
dc.Close()
return
}
}
复制代码
另外提一点,putConnHook 是测试时用的 hook,一般不对外开放,这个写法可以用来自己测试的时候方便使用。
清理无效连接
在 putConnDBLocked 方法里面讲到,如果放回连接池就会发起一次清理连接池空闲连接的动作,在重新设置连接最大生存时间的时候也会触发一次。可以注意到,清理无效连接不是一初始化数据库就开一个协程等超时清理(如果是我之前就会这么干)。虽然是之后会每过一段时间会清理一次,但数据库初始化完之后不会直接挂一个清理连接的协程。
// L953
func (db *DB) startCleanerLocked() {
if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
go db.connectionCleaner(db.maxLifetime)
}
}
复制代码
清理连接这个方法,里面主要的一个就是先获取超时的所有连接,即下方代码块的变量 closing。在都获取完之后,才会去一个一个 close,而不是一个超时就关掉一个 conn。
// L960
func (db *DB) connectionCleaner(d time.Duration) {
const minInterval = time.Second
if d < minInterval {
d = minInterval
}
t := time.NewTimer(d)
for {
select {
case <-t.C:
case <-db.cleanerCh: // maxLifetime was changed or db was closed.
}
db.mu.Lock()
d = db.maxLifetime
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
expiredSince := nowFunc().Add(-d)
var closing []*driverConn
for i := 0; i < len(db.freeConn); i++ {
c := db.freeConn[i]
if c.createdAt.Before(expiredSince) {
closing = append(closing, c)
last := len(db.freeConn) - 1
db.freeConn[i] = db.freeConn[last]
db.freeConn[last] = nil
db.freeConn = db.freeConn[:last]
i--
}
}
db.maxLifetimeClosed += int64(len(closing))
db.mu.Unlock()
for _, c := range closing {
c.Close()
}
if d < minInterval {
d = minInterval
}
t.Reset(d)
}
}
复制代码
Go 1.16.x 优化
Go 1.16.x 的优化在下面这个 gif 可以明显地看到,相较于之前的版本增加了一个 maxIdleTime,主要是不想让连接在连接池中等待太久。虽然可以一直在连接池中等着下次复用,但毕竟占的是资源,一旦创建了很多,最大生存时间又很长,是很占内存的。因此,Go 1.16.x 做了这个优化。那相应的在清理连接的方法里,就多了一个查看连接是否有超过 maxaIdleTime 的计算。
总结
连接池的主要内容其实就是获取连接,释放连接,复用连接,清理连接。在整个 4 个主要流程中,反复考虑的就是要为潜在需要连接的请求创建连接,尽量减少阻塞的请求。同时,尽量去回收连接,不要都是从数据库去创建连接。
因为我没有实际的高并发的生产经验,所以没有连接池的调优能力,只能说通过源码分析看到学习写代码的一些规范及优点,然后还有就是解决问题的思路,在其他的池的设计也可以借鉴这里的设计思路去举一反三。
评论