今天我们来看一看 redigo(https://github.com/gomodule/redigo)是如何实现连接池的。
概述
连接池部分的代码在redis/pool.go中,相关结构体和接口的 UML 图如下图所示
Pool结构体定义了连接池的属性和行为,包括以下主要参数:
Dial func() (Conn, error):指向用于新建连接的函数,由 redigo 的用户指定
MaxIdle int:最大空闲连接数
MaxActive int:连接池的容量,即连接池中最多可以包含多少个连接,包括正在使用的连接和空闲连接
IdleTimeout time.Duration:空闲连接的最大空闲时间
Get() Conn:从连接池获取连接
另外,idleList是一个由空闲连接(类型为*poolConn)构成的双向链表。pushFront()、popFront()和popBack()这 3 个函数分别用于,通过将刚刚使用过的连接插入到链表头部来将其放回连接池;从链表头部取出空闲连接;从链表尾部删除长时间没有使用的空闲连接。
type idleList struct { count int front, back *poolConn}
复制代码
实现连接池时,需要考虑以下几个问题
下面就带着这几个问题,重点梳理一下从连接池中获取连接的func (p *Pool) Get() Conn方法和将连接放回连接池的func (ac *activeConn) Close()方法。
问题 1:如何回收空闲时间过长的连接?
先来梳理func (p *Pool) Get() Conn方法的逻辑。
func (p *Pool) Get() Conn { // GetContext returns errorConn in the first argument when an error occurs. c, _ := p.GetContext(context.Background()) return c}
func (p *Pool) GetContext(ctx context.Context) (Conn, error) { // Wait until there is a vacant connection in the pool. waited, err := p.waitVacantConn(ctx) if err != nil { return errorConn{err}, err } // ...
复制代码
Get()会返回两种类型的连接,activeConn和errorConn,这两种类型都实现了Conn接口。
这里采用了称为Null Object或Special Case的设计模式,即使获取连接时发生错误,也不会产生nil,而是返回一个异常的连接。只不过在异常连接上的绝大多数操作都会返回错误。这样设计的好处一是避免了空指针异常,二是延后了错误处理的时机,或者说减少了一处需要检查错误的位置,redigo 的用户可以认为Get()总会返回“有效的”连接,而在错误检查时,只需重点检查Do()等方法的返回值。
Get()调用了GetContext(),而后者又调用了waitVacantConn()。waitVacantConn()有两条执行路径,我们先来看最简单的一条——若没有开启等待模式p.Wait == false或者没有设置最大连接数(连接池的容量),就直接返回。p.Wait == true时的逻辑将在后面介绍。
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) { if !p.Wait || p.MaxActive <= 0 { // No wait or no connection limit. return 0, nil } // ...
复制代码
现在,关注点又回到GetContext()方法里了,
func (p *Pool) GetContext(ctx context.Context) (Conn, error) { // ... p.mu.Lock()
if waited > 0 { // ... }
// Prune stale connections at the back of the idle list. if p.IdleTimeout > 0 { n := p.idle.count for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.idle.back // ① p.idle.popBack() p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } }
复制代码
这部分代码回答了有关连接池的一个问题——如何回收空闲时间过长的连接?
redigo 的实现方法是获取连接时顺带回收空闲时间过长的连接。①p.idle.back(类型为*poolConn)是指向空闲连接的双向链表尾部的指针,所指向的空闲连接的t字段记录了该连接最后一次使用的时间。如果t加上连接池参数p.IdleTimeout(最大空闲时间)在当前时间nowFunc()之前(类比食品的保质期在当前时间之前),就从双向链表p.idle中删除该连接后关闭。
由于这部分代码可能会被多个 goroutine 并发执行,所以在回收(=从链表中删除)空闲连接时,以及p.active计数器--时,都需要通过p.mu.Lock()加锁。redigo 在这里还尽可能缩小了锁的范围:
p.mu.Lock()// for ... p.mu.Unlock() pc.c.Close() p.mu.Lock() // ...// }
复制代码
问题 2:如何确保连接池中的连接依然存活?
回收完空闲时间过长的连接后,就可以遍历空闲连接的链表,从中获取可用的空闲连接了。这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu的保护。
p.mu.Lock()for p.idle.front != nil { pc := p.idle.front p.idle.popFront() p.mu.Unlock() // return an `activeConn` or check next idle connection // ...}
复制代码
activeConn的结构如下
type activeConn struct { p *Pool pc *poolConn state int}
复制代码
之所以要确保空闲连接依然存活,是因为空闲连接虽然存在,但可能已经是失效的连接了。那么什么时候会出现这种情况呢?
在 Redis 的配置中,有一项叫做timeout,默认为0。
# Close the connection after a client is idle for N seconds (0 to disable)timeout 0
复制代码
如果该选项的值不为 0,且小于 redigo 连接池的配置项MaxIdle的值会发生什么呢?我们不妨测试一下
$ fgrep timeout -B2 /usr/local/etc/redis.conf
# Close the connection after a client is idle for N seconds (0 to disable)timeout 5--
$ # 重启redis $ # brew services restart redis
复制代码
func main() { pool := &redis.Pool{ MaxActive: 1, MaxIdle: 1, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", "127.0.0.1:6379") }, }
c := pool.Get() reply, err := c.Do("PING") if err != nil { fmt.Println(reply, err) } c.Close() // return to pool
time.Sleep(20 * time.Second) c = pool.Get() reply, err = c.Do("PING") if err != nil { fmt.Println(reply, err) // <nil> EOF }}
复制代码
通过 Wireshark 抓包,就很容易解释为什么第二次c.Do("PING")报错了,
可以看到在 9.40 秒时,Redis 关闭了与客户端之间的 TCP 连接。而在 23.54 秒左右(相对于第一次PING时的 3.53 秒,经历了 20 秒,就是time.Sleep(20 * time.Second)睡眠的时间),redigo 在已关闭的空闲连接上发送PING,Redis 直接通过RST标志断开了连接。
这就是空闲连接虽然存在,但已经失效的情况。
为了避免这种情况,我们不但可以根据 Redis 的timeout的配置,调整连接池IdleTimeout time.Duration的值,还可以在创建连接池时指定TestOnBorrow函数,例如
// pool := &redis.Pool{// // Other pool configuration not shown in this example.// TestOnBorrow: func(c redis.Conn, t time.Time) error {// if time.Since(t) < time.Minute {// return nil// }// _, err := c.Do("PING")// return err// },// }
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && // ... return &activeConn{p: p, pc: pc}, nil } pc.c.Close() // ① p.mu.Lock() p.active--
复制代码
可以看到,当p.TestOnBorrow检测失败时,①空闲连接就会因无效而被关闭,避免了后续在已被 Redis 关闭的 TCP 连接上发送请求的问题。
问题 3:新建连接的问题
如果空闲连接的链表为空,或者链表中没有存活着的可用连接,就不得不新建连接了。
新建连接很简单,只需要调用dial()函数,
p.mu.Lock()// ...p.active++p.mu.Unlock()c, err := p.dial(ctx)// ...return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
复制代码
dial()的实现如下,仅仅是调用了创建连接池时指定的新建连接的(Dial成员指向的)函数
func (p *Pool) dial(ctx context.Context) (Conn, error) { // ... if p.Dial != nil { return p.Dial() } // ...}
复制代码
但新建时需要考虑,当已创建的连接数已达到连接池的容量上限时要如何处理。
我们先来看 redigo 中最简单的一种处理方法,
// Handle limit for p.Wait == false. if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { p.mu.Unlock() return errorConn{ErrPoolExhausted}, ErrPoolExhausted }
复制代码
此时,p.Wait == false,且已创建的连接数达到了连接池的容量上限(p.active >= p.MaxActive),于是 redigo 直接返回了表示错误的连接return errorConn{}。
当p.Wait == true时的处理方式稍微复杂一些,简单来说就是,当已创建的连接数达到了连接池的容量上限时,通过Pool结构体上的ch
type Pool struct { // ... ch chan struct{} // limits open connections when p.Wait is true
复制代码
让获取连接的 goroutine 进入等待状态。
select { case <-p.ch: // ... case <-ctx.Done(): return 0, ctx.Err() }
复制代码
p.ch有点类似令牌桶,只要桶里还有令牌,就不会阻塞。初始化是在lazyInit()函数中完成的,桶中初始有p.MaxActive个令牌。
func (p *Pool) lazyInit() { p.initOnce.Do(func() { p.ch = make(chan struct{}, p.MaxActive) // ... for i := 0; i < p.MaxActive; i++ { p.ch <- struct{}{} } } })}
复制代码
将连接放回连接池
最后再来看一看将连接放回连接池的过程。
释放连接是通过用户调用func (ac *activeConn) Close() (err error) {实现的。该方法最终会调用
func (p *Pool) put(pc *poolConn, forceClose bool) error { p.mu.Lock() if !p.closed && !forceClose { pc.t = nowFunc() // ① p.idle.pushFront(pc) // ② if p.idle.count > p.MaxIdle { // ┐ pc = p.idle.back // │- ③ p.idle.popBack() // ┘ } else { pc = nil } }
if pc != nil { // ┐ p.mu.Unlock() // │ pc.c.Close() // │- ③ p.mu.Lock() // │ p.active-- // │ } // ┘
// ... p.mu.Unlock() return nil}
复制代码
put()的主流程很简单
与从连接池中获取连接一样,这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu的保护。
至此,我们就梳理完成了 redigo 中连接池部分的源代码了。
附
描述 redigo 的 UML 类图的代码
@startuml
interface Conn {}
struct Pool { Dial func MaxIdle int MaxActive int **idle idleList** IdleTimeout time.Duration **Get() Conn**}
struct idleList { count int front *poolConn back *poolConn pushFront(pc *poolConn) popFront() popBack()}
struct poolConn {}
struct activeConn { p *Pool pc *poolConn **Close() error** Do(cmd string, args ...any) (reply any, err error)
}
idleList "1" *-- "many" poolConn : contains
idleList --* Pool
Pool --* activeConnpoolConn --* activeConnactiveConn ..|> Conn
poolConn ..|> Conn@enduml
复制代码
评论