今天我们来看一看 redigo(https://github.com/gomodule/redigo)是如何实现连接池的。
概述
连接池部分的代码在redis/pool.go
中,相关结构体和接口的 UML 图如下图所示
Pool
结构体定义了连接池的属性和行为,包括以下主要参数:
Dial func() (Conn, error)
:指向用于新建连接的函数,由 redigo 的用户指定
MaxIdle int
:最大空闲连接数
MaxActive int
:连接池的容量,即连接池中最多可以包含多少个连接,包括正在使用的连接和空闲连接
IdleTimeout time.Duration
:空闲连接的最大空闲时间
Get() Conn
:从连接池获取连接
另外,idleList
是一个由空闲连接(类型为*poolConn
)构成的双向链表。pushFront()
、popFront()
和popBack()
这 3 个函数分别用于,通过将刚刚使用过的连接插入到链表头部来将其放回连接池;从链表头部取出空闲连接;从链表尾部删除长时间没有使用的空闲连接。
type idleList struct {
count int
front, back *poolConn
}
复制代码
实现连接池时,需要考虑以下几个问题
下面就带着这几个问题,重点梳理一下从连接池中获取连接的func (p *Pool) Get() Conn
方法和将连接放回连接池的func (ac *activeConn) Close()
方法。
问题 1:如何回收空闲时间过长的连接?
先来梳理func (p *Pool) Get() Conn
方法的逻辑。
func (p *Pool) Get() Conn {
// GetContext returns errorConn in the first argument when an error occurs.
c, _ := p.GetContext(context.Background())
return c
}
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
return errorConn{err}, err
}
// ...
复制代码
Get()
会返回两种类型的连接,activeConn
和errorConn
,这两种类型都实现了Conn
接口。
这里采用了称为Null Object或Special Case的设计模式,即使获取连接时发生错误,也不会产生nil
,而是返回一个异常的连接。只不过在异常连接上的绝大多数操作都会返回错误。这样设计的好处一是避免了空指针异常,二是延后了错误处理的时机,或者说减少了一处需要检查错误的位置,redigo 的用户可以认为Get()
总会返回“有效的”连接,而在错误检查时,只需重点检查Do()
等方法的返回值。
Get()
调用了GetContext()
,而后者又调用了waitVacantConn()
。waitVacantConn()
有两条执行路径,我们先来看最简单的一条——若没有开启等待模式p.Wait == false
或者没有设置最大连接数(连接池的容量),就直接返回。p.Wait == true
时的逻辑将在后面介绍。
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
if !p.Wait || p.MaxActive <= 0 {
// No wait or no connection limit.
return 0, nil
}
// ...
复制代码
现在,关注点又回到GetContext()
方法里了,
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// ...
p.mu.Lock()
if waited > 0 {
// ...
}
// Prune stale connections at the back of the idle list.
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back // ①
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
复制代码
这部分代码回答了有关连接池的一个问题——如何回收空闲时间过长的连接?
redigo 的实现方法是获取连接时顺带回收空闲时间过长的连接。①p.idle.back
(类型为*poolConn
)是指向空闲连接的双向链表尾部的指针,所指向的空闲连接的t
字段记录了该连接最后一次使用的时间。如果t
加上连接池参数p.IdleTimeout
(最大空闲时间)在当前时间nowFunc()
之前(类比食品的保质期在当前时间之前),就从双向链表p.idle
中删除该连接后关闭。
由于这部分代码可能会被多个 goroutine 并发执行,所以在回收(=从链表中删除)空闲连接时,以及p.active
计数器--
时,都需要通过p.mu.Lock()
加锁。redigo 在这里还尽可能缩小了锁的范围:
p.mu.Lock()
// for ...
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
// ...
// }
复制代码
问题 2:如何确保连接池中的连接依然存活?
回收完空闲时间过长的连接后,就可以遍历空闲连接的链表,从中获取可用的空闲连接了。这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu
的保护。
p.mu.Lock()
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
// return an `activeConn` or check next idle connection
// ...
}
复制代码
activeConn
的结构如下
type activeConn struct {
p *Pool
pc *poolConn
state int
}
复制代码
之所以要确保空闲连接依然存活,是因为空闲连接虽然存在,但可能已经是失效的连接了。那么什么时候会出现这种情况呢?
在 Redis 的配置中,有一项叫做timeout
,默认为0
。
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
复制代码
如果该选项的值不为 0,且小于 redigo 连接池的配置项MaxIdle
的值会发生什么呢?我们不妨测试一下
$ fgrep timeout -B2 /usr/local/etc/redis.conf
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 5
--
$ # 重启redis
$ # brew services restart redis
复制代码
func main() {
pool := &redis.Pool{
MaxActive: 1,
MaxIdle: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", "127.0.0.1:6379")
},
}
c := pool.Get()
reply, err := c.Do("PING")
if err != nil {
fmt.Println(reply, err)
}
c.Close() // return to pool
time.Sleep(20 * time.Second)
c = pool.Get()
reply, err = c.Do("PING")
if err != nil {
fmt.Println(reply, err) // <nil> EOF
}
}
复制代码
通过 Wireshark 抓包,就很容易解释为什么第二次c.Do("PING")
报错了,
可以看到在 9.40 秒时,Redis 关闭了与客户端之间的 TCP 连接。而在 23.54 秒左右(相对于第一次PING
时的 3.53 秒,经历了 20 秒,就是time.Sleep(20 * time.Second)
睡眠的时间),redigo 在已关闭的空闲连接上发送PING
,Redis 直接通过RST
标志断开了连接。
这就是空闲连接虽然存在,但已经失效的情况。
为了避免这种情况,我们不但可以根据 Redis 的timeout
的配置,调整连接池IdleTimeout time.Duration
的值,还可以在创建连接池时指定TestOnBorrow
函数,例如
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// if time.Since(t) < time.Minute {
// return nil
// }
// _, err := c.Do("PING")
// return err
// },
// }
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
// ...
return &activeConn{p: p, pc: pc}, nil
}
pc.c.Close() // ①
p.mu.Lock()
p.active--
复制代码
可以看到,当p.TestOnBorrow
检测失败时,①空闲连接就会因无效而被关闭,避免了后续在已被 Redis 关闭的 TCP 连接上发送请求的问题。
问题 3:新建连接的问题
如果空闲连接的链表为空,或者链表中没有存活着的可用连接,就不得不新建连接了。
新建连接很简单,只需要调用dial()
函数,
p.mu.Lock()
// ...
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// ...
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
复制代码
dial()
的实现如下,仅仅是调用了创建连接池时指定的新建连接的(Dial
成员指向的)函数
func (p *Pool) dial(ctx context.Context) (Conn, error) {
// ...
if p.Dial != nil {
return p.Dial()
}
// ...
}
复制代码
但新建时需要考虑,当已创建的连接数已达到连接池的容量上限时要如何处理。
我们先来看 redigo 中最简单的一种处理方法,
// Handle limit for p.Wait == false.
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return errorConn{ErrPoolExhausted}, ErrPoolExhausted
}
复制代码
此时,p.Wait == false
,且已创建的连接数达到了连接池的容量上限(p.active >= p.MaxActive
),于是 redigo 直接返回了表示错误的连接return errorConn{}
。
当p.Wait == true
时的处理方式稍微复杂一些,简单来说就是,当已创建的连接数达到了连接池的容量上限时,通过Pool
结构体上的ch
type Pool struct {
// ...
ch chan struct{} // limits open connections when p.Wait is true
复制代码
让获取连接的 goroutine 进入等待状态。
select {
case <-p.ch:
// ...
case <-ctx.Done():
return 0, ctx.Err()
}
复制代码
p.ch
有点类似令牌桶,只要桶里还有令牌,就不会阻塞。初始化是在lazyInit()
函数中完成的,桶中初始有p.MaxActive
个令牌。
func (p *Pool) lazyInit() {
p.initOnce.Do(func() {
p.ch = make(chan struct{}, p.MaxActive)
// ...
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
})
}
复制代码
将连接放回连接池
最后再来看一看将连接放回连接池的过程。
释放连接是通过用户调用func (ac *activeConn) Close() (err error) {
实现的。该方法最终会调用
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc() // ①
p.idle.pushFront(pc) // ②
if p.idle.count > p.MaxIdle { // ┐
pc = p.idle.back // │- ③
p.idle.popBack() // ┘
} else {
pc = nil
}
}
if pc != nil { // ┐
p.mu.Unlock() // │
pc.c.Close() // │- ③
p.mu.Lock() // │
p.active-- // │
} // ┘
// ...
p.mu.Unlock()
return nil
}
复制代码
put()
的主流程很简单
与从连接池中获取连接一样,这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu
的保护。
至此,我们就梳理完成了 redigo 中连接池部分的源代码了。
附
描述 redigo 的 UML 类图的代码
@startuml
interface Conn {
}
struct Pool {
Dial func
MaxIdle int
MaxActive int
**idle idleList**
IdleTimeout time.Duration
**Get() Conn**
}
struct idleList {
count int
front *poolConn
back *poolConn
pushFront(pc *poolConn)
popFront()
popBack()
}
struct poolConn {}
struct activeConn {
p *Pool
pc *poolConn
**Close() error**
Do(cmd string, args ...any) (reply any, err error)
}
idleList "1" *-- "many" poolConn : contains
idleList --* Pool
Pool --* activeConn
poolConn --* activeConn
activeConn ..|> Conn
poolConn ..|> Conn
@enduml
复制代码
评论