写点什么

redigo 连接池的源码分析

作者:胡译胡说
  • 2023-10-25
    北京
  • 本文字数:4866 字

    阅读完需:约 16 分钟

redigo连接池的源码分析

今天我们来看一看 redigo(https://github.com/gomodule/redigo)是如何实现连接池的。

概述

连接池部分的代码在redis/pool.go中,相关结构体和接口的 UML 图如下图所示



Pool结构体定义了连接池的属性和行为,包括以下主要参数:


  • Dial func() (Conn, error):指向用于新建连接的函数,由 redigo 的用户指定

  • MaxIdle int:最大空闲连接数

  • MaxActive int:连接池的容量,即连接池中最多可以包含多少个连接,包括正在使用的连接和空闲连接

  • IdleTimeout time.Duration:空闲连接的最大空闲时间

  • Get() Conn:从连接池获取连接


另外,idleList是一个由空闲连接(类型为*poolConn)构成的双向链表。pushFront()popFront()popBack()这 3 个函数分别用于,通过将刚刚使用过的连接插入到链表头部来将其放回连接池;从链表头部取出空闲连接;从链表尾部删除长时间没有使用的空闲连接。


type idleList struct {    count       int    front, back *poolConn}
复制代码


实现连接池时,需要考虑以下几个问题


  • 何时新建连接?

  • 若新建连接时发现已创建的连接数到达连接池的容量上限,该如何处理?

  • 如何回收空闲时间过长的连接?

  • 如何确保连接池中的连接依然存活?


下面就带着这几个问题,重点梳理一下从连接池中获取连接的func (p *Pool) Get() Conn方法和将连接放回连接池的func (ac *activeConn) Close()方法。

问题 1:如何回收空闲时间过长的连接?

先来梳理func (p *Pool) Get() Conn方法的逻辑。


func (p *Pool) Get() Conn {    // GetContext returns errorConn in the first argument when an error occurs.    c, _ := p.GetContext(context.Background())    return c}
func (p *Pool) GetContext(ctx context.Context) (Conn, error) { // Wait until there is a vacant connection in the pool. waited, err := p.waitVacantConn(ctx) if err != nil { return errorConn{err}, err } // ...
复制代码


Get()会返回两种类型的连接,activeConnerrorConn,这两种类型都实现了Conn接口。


这里采用了称为Null ObjectSpecial Case的设计模式,即使获取连接时发生错误,也不会产生nil,而是返回一个异常的连接。只不过在异常连接上的绝大多数操作都会返回错误。这样设计的好处一是避免了空指针异常,二是延后了错误处理的时机,或者说减少了一处需要检查错误的位置,redigo 的用户可以认为Get()总会返回“有效的”连接,而在错误检查时,只需重点检查Do()等方法的返回值。


Get()调用了GetContext(),而后者又调用了waitVacantConn()waitVacantConn()有两条执行路径,我们先来看最简单的一条——若没有开启等待模式p.Wait == false或者没有设置最大连接数(连接池的容量),就直接返回。p.Wait == true时的逻辑将在后面介绍。


func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {    if !p.Wait || p.MaxActive <= 0 {        // No wait or no connection limit.        return 0, nil    }  // ...
复制代码


现在,关注点又回到GetContext()方法里了,


func (p *Pool) GetContext(ctx context.Context) (Conn, error) {    // ...    p.mu.Lock()
if waited > 0 { // ... }
// Prune stale connections at the back of the idle list. if p.IdleTimeout > 0 { n := p.idle.count for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.idle.back // ① p.idle.popBack() p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } }
复制代码


这部分代码回答了有关连接池的一个问题——如何回收空闲时间过长的连接


redigo 的实现方法是获取连接时顺带回收空闲时间过长的连接。①p.idle.back(类型为*poolConn)是指向空闲连接的双向链表尾部的指针,所指向的空闲连接的t字段记录了该连接最后一次使用的时间。如果t加上连接池参数p.IdleTimeout(最大空闲时间)在当前时间nowFunc()之前(类比食品的保质期在当前时间之前),就从双向链表p.idle中删除该连接后关闭。


由于这部分代码可能会被多个 goroutine 并发执行,所以在回收(=从链表中删除)空闲连接时,以及p.active计数器--时,都需要通过p.mu.Lock()加锁。redigo 在这里还尽可能缩小了锁的范围:


p.mu.Lock()// for ...  p.mu.Unlock()  pc.c.Close()  p.mu.Lock()    // ...// }
复制代码

问题 2:如何确保连接池中的连接依然存活?

回收完空闲时间过长的连接后,就可以遍历空闲连接的链表,从中获取可用的空闲连接了。这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu的保护。


p.mu.Lock()for p.idle.front != nil {    pc := p.idle.front    p.idle.popFront()    p.mu.Unlock()  // return an `activeConn` or check next idle connection     // ...}
复制代码


activeConn的结构如下


type activeConn struct {    p     *Pool    pc    *poolConn    state int}
复制代码


之所以要确保空闲连接依然存活,是因为空闲连接虽然存在,但可能已经是失效的连接了。那么什么时候会出现这种情况呢?


在 Redis 的配置中,有一项叫做timeout,默认为0


# Close the connection after a client is idle for N seconds (0 to disable)timeout 0
复制代码


如果该选项的值不为 0,且小于 redigo 连接池的配置项MaxIdle的值会发生什么呢?我们不妨测试一下


$ fgrep timeout -B2 /usr/local/etc/redis.conf
# Close the connection after a client is idle for N seconds (0 to disable)timeout 5--
$ # 重启redis $ # brew services restart redis
复制代码


func main() {    pool := &redis.Pool{        MaxActive: 1,        MaxIdle:   1,        Dial: func() (redis.Conn, error) {            return redis.Dial("tcp", "127.0.0.1:6379")        },    }
c := pool.Get() reply, err := c.Do("PING") if err != nil { fmt.Println(reply, err) } c.Close() // return to pool
time.Sleep(20 * time.Second) c = pool.Get() reply, err = c.Do("PING") if err != nil { fmt.Println(reply, err) // <nil> EOF }}
复制代码


通过 Wireshark 抓包,就很容易解释为什么第二次c.Do("PING")报错了,



可以看到在 9.40 秒时,Redis 关闭了与客户端之间的 TCP 连接。而在 23.54 秒左右(相对于第一次PING时的 3.53 秒,经历了 20 秒,就是time.Sleep(20 * time.Second)睡眠的时间),redigo 在已关闭的空闲连接上发送PING,Redis 直接通过RST标志断开了连接。


这就是空闲连接虽然存在,但已经失效的情况。


为了避免这种情况,我们不但可以根据 Redis 的timeout的配置,调整连接池IdleTimeout time.Duration的值,还可以在创建连接池时指定TestOnBorrow函数,例如


//  pool := &redis.Pool{//    // Other pool configuration not shown in this example.//    TestOnBorrow: func(c redis.Conn, t time.Time) error {//      if time.Since(t) < time.Minute {//        return nil//      }//      _, err := c.Do("PING")//      return err//    },//  }
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && // ... return &activeConn{p: p, pc: pc}, nil } pc.c.Close() // ① p.mu.Lock() p.active--
复制代码


可以看到,当p.TestOnBorrow检测失败时,①空闲连接就会因无效而被关闭,避免了后续在已被 Redis 关闭的 TCP 连接上发送请求的问题。

问题 3:新建连接的问题

如果空闲连接的链表为空,或者链表中没有存活着的可用连接,就不得不新建连接了。


新建连接很简单,只需要调用dial()函数,


p.mu.Lock()// ...p.active++p.mu.Unlock()c, err := p.dial(ctx)// ...return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
复制代码


dial()的实现如下,仅仅是调用了创建连接池时指定的新建连接的(Dial成员指向的)函数


func (p *Pool) dial(ctx context.Context) (Conn, error) {    // ...    if p.Dial != nil {        return p.Dial()    }    // ...}
复制代码


但新建时需要考虑,当已创建的连接数已达到连接池的容量上限时要如何处理。


我们先来看 redigo 中最简单的一种处理方法,


// Handle limit for p.Wait == false.    if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {        p.mu.Unlock()        return errorConn{ErrPoolExhausted}, ErrPoolExhausted    }
复制代码


此时,p.Wait == false,且已创建的连接数达到了连接池的容量上限(p.active >= p.MaxActive),于是 redigo 直接返回了表示错误的连接return errorConn{}


p.Wait == true时的处理方式稍微复杂一些,简单来说就是,当已创建的连接数达到了连接池的容量上限时,通过Pool结构体上的ch


type Pool struct {  // ...    ch           chan struct{} // limits open connections when p.Wait is true
复制代码


让获取连接的 goroutine 进入等待状态。


    select {    case <-p.ch:        // ...    case <-ctx.Done():        return 0, ctx.Err()    }
复制代码


p.ch有点类似令牌桶,只要桶里还有令牌,就不会阻塞。初始化是在lazyInit()函数中完成的,桶中初始有p.MaxActive个令牌。


func (p *Pool) lazyInit() {    p.initOnce.Do(func() {        p.ch = make(chan struct{}, p.MaxActive)        // ...            for i := 0; i < p.MaxActive; i++ {                p.ch <- struct{}{}            }        }    })}
复制代码

将连接放回连接池

最后再来看一看将连接放回连接池的过程。


释放连接是通过用户调用func (ac *activeConn) Close() (err error) {实现的。该方法最终会调用


func (p *Pool) put(pc *poolConn, forceClose bool) error {    p.mu.Lock()    if !p.closed && !forceClose {        pc.t = nowFunc()            // ①        p.idle.pushFront(pc)        // ②        if p.idle.count > p.MaxIdle {    // ┐             pc = p.idle.back             // │- ③            p.idle.popBack()             // ┘            } else {            pc = nil        }    }
if pc != nil { // ┐ p.mu.Unlock() // │ pc.c.Close() // │- ③ p.mu.Lock() // │ p.active-- // │ } // ┘
// ... p.mu.Unlock() return nil}
复制代码


put()的主流程很简单


  • ①更新连接的最后一次使用时间为当前时间

  • ②将连接插入到空闲连接链表的头部

  • ③如果当前的空闲连接数(已算上刚刚插入到链表头部的空闲连接)已超过了MaxIdle,则将空闲链表尾部的连接从链表删除后关闭


与从连接池中获取连接一样,这部分代码同样可能会被多个 goroutine 并发执行,所以依然需要互斥锁p.mu的保护。


至此,我们就梳理完成了 redigo 中连接池部分的源代码了。

描述 redigo 的 UML 类图的代码


@startuml
interface Conn {}
struct Pool { Dial func MaxIdle int MaxActive int **idle idleList** IdleTimeout time.Duration **Get() Conn**}
struct idleList { count int front *poolConn back *poolConn pushFront(pc *poolConn) popFront() popBack()}
struct poolConn {}
struct activeConn { p *Pool pc *poolConn **Close() error** Do(cmd string, args ...any) (reply any, err error)
}
idleList "1" *-- "many" poolConn : contains
idleList --* Pool
Pool --* activeConnpoolConn --* activeConnactiveConn ..|> Conn
poolConn ..|> Conn@enduml
复制代码


发布于: 刚刚阅读数: 6
用户头像

胡译胡说

关注

还未添加个人签名 2019-08-27 加入

软件工程师、技术图书译者。译有《图解云计算架构》《图解量子计算机》《计算机是怎样跑起来的》《自制搜索引擎》等。

评论

发布
暂无评论
redigo连接池的源码分析_Go_胡译胡说_InfoQ写作社区