Spring Boot 青睐的数据库连接池 HikariCP 为什么是史上最快的?
return?collector;
通过上面的解释可以知道在 HikariCP 中如何自定义一个自己的监控器,以及相比 Druid 的监控,有什么区别。 工作中很多时候都是需要自定义的,我司虽然也是用的普罗米修斯监控,但是因为 HikariCP 原生的普罗米修斯收集器里面对监控指标的命名并不符合我司的规范
,所以就自定义
了一个,有类似问题的不妨也试一试。
???? 这一节没有画图,纯代码,因为画图不太好解释这部分的东西,这部分内容与连接池整体流程关系也不大,充其量获取了连接池本身的一些属性,在连接池里的触发点也在上面代码段的注释里说清楚了,看代码定义可能更好理解一些。
七、流程 2.2:连接泄漏的检测与告警
本节对应主流程2
里的子流程2.2
,在初始化池对象时,初始化了一个叫做leakTaskFactory
的属性,本节来看下它具体是用来做什么的。
7.1:它是做什么的?
一个连接被拿出去使用时间超过leakDetectionThreshold
(可配置,默认 0)未归还的,会触发一个连接泄漏警告,通知业务方目前存在连接泄漏的问题。
7.2:过程详解
该属性是ProxyLeakTaskFactory
类型对象,且它还会持有houseKeepingExecutorService
这个线程池对象,用于生产ProxyLeakTask
对象,然后利用上面的houseKeepingExecutorService
延时运行该对象里的run
方法。该流程的触发点在上面的流程1.1
最后包装成ProxyConnection
对象的那一步,来看看具体的流程图:
流程 2.2
每次在流程1.1
那里生成ProxyConnection
对象时,都会触发上面的流程,由流程图可以知道,ProxyConnection
对象持有PoolEntry
和ProxyLeakTask
的对象,其中初始化ProxyLeakTask
对象时就用到了leakTaskFactory
对象,通过其schedule
方法可以进行ProxyLeakTask
的初始化,并将其实例传递给ProxyConnection
进行初始化赋值(ps:由图知ProxyConnection
在触发回收事件时,会主动取消这个泄漏检查任务,这也是ProxyConnection
需要持有ProxyLeakTask
对象的原因)。
在上面的流程图中可以知道,只有在leakDetectionThreshold
不等于 0 的时候才会生成一个带有实际延时任务的ProxyLeakTask
对象,否则返回无实际意义的空对象。所以要想启用连接泄漏检查,首先要把leakDetectionThreshold
配置设置上,这个属性表示经过该时间后借出去的连接仍未归还,则触发连接泄漏告警。
ProxyConnection
之所以要持有ProxyLeakTask
对象,是因为它可以监听到连接是否触发归还操作,如果触发,则调用cancel
方法取消延时任务,防止误告。
由此流程可以知道,跟 Druid 一样,HikariCP 也有连接对象泄漏检查,与 Druid 主动回收连接相比,HikariCP 实现更加简单,仅仅是在触发时打印警告日志,不会采取具体的强制回收的措施。
与 Druid 一样,默认也是关闭这个流程的,因为实际开发中一般使用第三方框架,框架本身会保证及时的 close 连接,防止连接对象泄漏,开启与否还是取决于业务是否需要,如果一定要开启,如何设置leakDetectionThreshold
的大小也是需要考虑的一件事。
八、主流程 3:生成连接对象
本节来讲下主流程2
里的createEntry
方法,这个方法利用 PoolBase 里的DriverDataSource
对象生成一个实际的连接对象(如果忘记DriverDatasource
是哪里初始化的了,可以看下主流程2
里PoolBase
的initializeDataSource
方法的作用),然后用PoolEntry
类包装成PoolEntry
对象,现在来看下这个包装类有哪些主要属性:
final?class?PoolEntry?implements?IConcurrentBagEntry?{
private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(PoolEntry.class);
//通过 cas 来修改 state 属性
private?static?final?AtomicIntegerFieldUpdater?stateUpdater;
Connection?connection;?//实际的物理连接对象
long?lastAccessed;?//触发回收时刷新该时间,表示“最近一次使用时间”
long?lastBorrowed;?//getConnection 里 borrow 成功后刷新该时间,表示“最近一次借出的时间”
@SuppressWarnings("FieldCanBeLocal")
private?volatile?int?state?=?0;?//连接状态,枚举值:IN_USE(使用中)、NOT_IN_USE(闲置中)、REMOVED(已移除)、RESERVED(标记为保留中)
private?volatile?boolean?evict;?//是否被标记为废弃,很多地方用到(比如流程 1.1 靠这个判断连接是否已被废弃,再比如主流程 4 里时钟回拨时触发的直接废弃逻辑)
private?volatile?ScheduledFuture<?>?endOfLife;?//用于在超过连接生命周期(maxLifeTime)时废弃连接的延时任务,这里 poolEntry 要持有该对象,主要是因为在对象主动被关闭时(意味着不需要在超过 maxLifeTime 时主动失效了),需要 cancel 掉该任务
private?final?FastList?openStatements;?//当前该连接对象上生成的所有的 statement 对象,用于在回收连接时主动关闭这些对象,防止存在漏关的 statement
private?final?HikariPool?hikariPool;?//持有 pool 对象
private?final?boolean?isReadOnly;?//是否为只读
private?final?boolean?isAutoCommit;?//是否存在事务
}
上面就是整个PoolEntry
对象里所有的属性,这里再说下endOfLife
对象,它是一个利用houseKeepingExecutorService
这个线程池对象做的延时任务,这个延时任务一般在创建好连接对象后maxLifeTime
左右的时间触发,具体来看下createEntry
代码:
private?PoolEntry?createPoolEntry()?{
final?PoolEntry?poolEntry?=?newPoolEntry();?//生成实际的连接对象
final?long?maxLifetime?=?config.getMaxLifetime();?//拿到配置好的 maxLifetime
if?(maxLifetime?>?0)?{?//<=0 的时候不启用主动过期策略
//?计算需要减去的随机数
//?源注释:variance?up?to?2.5%?of?the?maxlifetime
final?long?variance?=?maxLifetime?>?10_000???ThreadLocalRandom.current().nextLong(maxLifetime?/?40)?:?0;
final?long?lifetime?=?maxLifetime?-?variance;?//生成实际的延时时间
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
()?->?{?//实际的延时任务,这里直接触发 softEvictConnection,而 softEvictConnection 内则会标记该连接对象为废弃状态,然后尝试修改其状态为 STATE_RESERVED,若成功,则触发 closeConnection(对应流程 1.1.2)
if?(softEvictConnection(poolEntry,?"(connection?has?passed?maxLifetime)",?false?/?not?owner?/))?{
addBagItem(connectionBag.getWaitingThreadCount());?//回收完毕后,连接池内少了一个连接,就会尝试新增一个连接对象
}
},
lifetime,?MILLISECONDS));?//给 endOfLife 赋值,并且提交延时任务,lifetime 后触发
}
return?poolEntry;
}
//触发新增连接任务
public?void?addBagItem(final?int?waiting)?{
//前排提示:addConnectionQueue 和 addConnectionExecutor 的关系和初始化参考主流程 2
//当添加连接的队列里已提交的任务超过那些因为获取不到连接而发生阻塞的线程个数时,就进行提交连接新增连接的任务
final?boolean?shouldAdd?=?waiting?-?addConnectionQueue.size()?>=?0;?//?Yes,?>=?is?intentional.
if?(shouldAdd)?{
//提交任务给 addConnectionExecutor 这个线程池,PoolEntryCreator 是一个实现了 Callable 接口的类,下面将通过流程图的方式介绍该类的 call 方法
addConnectionExecutor.submit(poolEntryCreator);
}
}
通过上面的流程,可以知道,HikariCP 一般通过createEntry
方法来新增一个连接入池,每个连接被包装成 PoolEntry 对象,在创建好对象时,同时会提交一个延时任务来关闭废弃该连接,这个时间就是我们配置的maxLifeTime
,为了保证不在同一时间失效,HikariCP 还会利用maxLifeTime
减去一个随机数作为最终的延时任务延迟时间,然后在触发废弃任务时,还会触发addBagItem
,进行连接添加任务(因为废弃了一个连接,需要往池子里补充一个),该任务则交给由主流程2
里定义好的addConnectionExecutor
线程池执行,那么,现在来看下这个异步添加连接对象的任务流程:
addConnectionExecutor 的 call 流程
这个流程就是往连接池里加连接用的,跟createEntry
结合起来说是因为这俩流程是紧密相关的,除此之外,主流程5
(fillPool
,扩充连接池)也会触发该任务。
九、主流程 4:连接池缩容
HikariCP 会按照minIdle
定时清理闲置过久的连接,这个定时任务在主流程2
初始化连接池对象时被启用,跟上面的流程一样,也是利用houseKeepingExecutorService
这个线程池对象做该定时任务的执行器。
来看下主流程2
里是怎么启用该任务的:
//housekeepingPeriodMs 的默认值是 30s,所以定时任务的间隔为 30s
this.houseKeeperTask?=?houseKeepingExecutorService.scheduleWithFixedDelay(new?HouseKeeper(),?100L,?housekeepingPeriodMs,?MILLISECONDS);
那么本节主要来说下HouseKeeper
这个类,该类实现了Runnable
接口,回收逻辑主要在其run
方法内,来看看run
方法的逻辑流程图:
主流程 4:连接池缩容
上面的流程就是HouseKeeper
的 run 方法里具体做的事情,由于系统时间回拨会导致该定时任务回收一些连接时产生误差,因此存在如下判断:
//now 就是当前系统时间,previous 就是上次触发该任务时的时间,housekeepingPeriodMs 就是隔多久触发该任务一次
//也就是说 plusMillis(previous,?housekeepingPeriodMs)表示当前时间
//如果系统时间没被回拨,那么 plusMillis(now,?128)一定是大于当前时间的,如果被系统时间被回拨
//回拨的时间超过 128ms,那么下面的判断就成立,否则永远不会成立
if?(plusMillis(now,?128)?<?plusMillis(previous,?housekeepingPeriodMs))
这是 hikariCP 在解决系统时钟被回拨时做出的一种措施,通过流程图可以看到,它是直接把池子里所有的连接对象取出来挨个儿的标记成废弃,并且尝试把状态值修改为STATE_RESERVED
(后面会说明这些状态,这里先不深究)。如果系统时钟没有发生改变(绝大多数情况会命中这一块的逻辑),由图知,会把当前池内所有处于闲置状态(STATE_NOT_IN_USE
)的连接拿出来,然后计算需要检查的范围,然后循环着修改连接的状态:
//拿到所有处于闲置状态的连接
final?List?notInUse?=?connectionBag.values(STATE_NOT_IN_USE);
//计算出需要被检查闲置时间的数量,简单来说,池内需要保证最小 minIdle 个连接活着,所以需要计算出超出这个范围的闲置对象进行检查
int?toRemove?=?notInUse.size()?-?config.getMinIdle();
for?(PoolEntry?entry?:?notInUse)?{
//在检查范围内,且闲置时间超出 idleTimeout,然后尝试将连接对象状态由 STATE_NOT_IN_USE 变为 STATE_RESERVED 成功
if?(toRemove?>?0?&&?elapsedMillis(entry.lastAccessed,?now)?>?idleTimeout?&&?connectionBag.reserve(entry))?{
closeConnection(entry,?"(connection?has?passed?idleTimeout)");?//满足上述条件,进行连接关闭
toRemove--;
}
}
fillPool();?//因为可能回收了一些连接,所以要再次触发连接池扩充流程检查下是否需要新增连接。
上面的代码就是流程图里对应的没有回拨系统时间时的流程逻辑。该流程在idleTimeout
大于 0(默认等于 0)并且minIdle
小于maxPoolSize
的时候才会启用,默认是不启用的,若需要启用,可以按照条件来配置。
十、主流程 5:扩充连接池
这个流程主要依附 HikariPool 里的fillPool
方法,这个方法已经在上面很多流程里出现过了,它的作用就是在触发连接废弃、连接池连接不够用时,发起扩充连接数的操作,这是个很简单的过程,下面看下源码(为了使代码结构更加清晰,对源码做了细微改动):
//?PoolEntryCreator 关于 call 方法的实现流程在主流程 3 里已经看过了,但是这里却有俩 PoolEntryCreator 对象,
//?这是个较细节的地方,用于打日志用,不再说这部分,为了便于理解,只需要知道这俩对象执行的是同一块 call 方法即可
private?final?PoolEntryCreator?poolEntryCreator?=?new?PoolEntryCreator(null);
private?final?PoolEntryCreator?postFillPoolEntryCreator?=?new?PoolEntryCreator("After?adding?");
private?synchronized?void?fillPool()?{
//?这个判断就是根据当前池子里相关数据,推算出需要扩充的连接数,
//?判断方式就是利用最大连接数跟当前连接总数的差值,与最小连接数与当前池内闲置的连接数的差值,取其最小的那一个得到
int?needAdd?=?Math.min(maxPoolSize?-?connectionBag.size(),
minIdle?-?connectionBag.getCount(STATE_NOT_IN_USE));
//减去当前排队的任务,就是最终需要新增的连接数
final?int?connectionsToAdd?=?needAdd?-?addConnectionQueue.size();
for?(int?i?=?0;?i?<?connectionsToAdd;?i++)?{
//一般循环的最后一次会命中 postFillPoolEntryCreator 任务,其实就是在最后一次会打印一次日志而已(可以忽略该干扰逻辑)
addConnectionExecutor.submit((i?<?connectionsToAdd?-?1)???poolEntryCreator?:?postFillPoolEntryCreator);
}
}
由该过程可以知道,最终这个新增连接的任务也是交由addConnectionExecutor
线程池来处理的,而任务的主题也是PoolEntryCreator
,这个流程可以参考主流程3.
然后needAdd
的推算:
Math.min(最大连接数?-?池内当前连接总数,?最小连接数?-?池内闲置的连接数)
根据这个方式判断,可以保证池内的连接数永远不会超过maxPoolSize
,也永远不会低于minIdle
。在连接吃紧的时候,可以保证每次触发都以minIdle
的数量扩容。因此如果在maxPoolSize
跟minIdle
配置的值一样的话,在池内连接吃紧的时候,就不会发生任何扩容了。
十一、主流程 6:连接回收
最开始说过,最终真实的物理连接对象会被包装成PoolEntry
对象,存放进ConcurrentBag
,然后获取时,PoolEntry 对象又会被再次包装成ProxyConnection
对象暴露给使用方的,那么触发连接回收,实际上就是触发 ProxyConnection 里的 close 方法:
public?final?void?close()?throws?SQLException?{
//?原注释:Closing?statements?can?cause?connection?eviction,?so?this?must?run?before?the?conditional?below
closeStatements();?//此连接对象在业务方使用过程中产生的所有 statement 对象,进行统一 close,防止漏 close 的情况
if?(delegate?!=?ClosedConnection.CLOSED_CONNECTION)?{
leakTask.cancel();?//取消连接泄漏检查任务,参考流程 2.2
try?{
if?(isCommitStateDirty?&&?!isAutoCommit)?{?//在存在执行语句后并且还打开了事务,调用 close 时需要主动回滚事务
delegate.rollback();?//回滚
lastAccess?=?currentTime();?//刷新"最后一次使用时间"
}
}?finally?{
delegate?=?ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);?//触发回收
}
}
}
这个就是 ProxyConnection 里的 close 方法,可以看到它最终会调用 PoolEntry 的 recycle 方法进行回收,除此之外,连接对象的最后一次使用时间也是在这个时候刷新的,该时间是个很重要的属性,可以用来判断一个连接对象的闲置时间,来看下 PoolEntry 的recycle
方法:
void?recycle(final?long?lastAccessed)?{
if?(connection?!=?null)?{
this.lastAccessed?=?lastAccessed;?//刷新最后使用时间
hikariPool.recycle(this);?//触发 HikariPool 的回收方法,把自己传过去
}
}
之前有说过,每个 PoolEntry 对象都持有 HikariPool 的对象,方便触发连接池的一些操作,由上述代码可以看到,最终还是会触发 HikariPool 里的 recycle 方法,再来看下 HikariPool 的 recycle 方法:
void?recycle(final?PoolEntry?poolEntry)?{
metricsTracker.recordConnectionUsage(poolEntry);?//监控指标相关,忽略
connectionBag.requite(poolEntry);?//最终触发 connectionBag 的 requite 方法归还连接,该流程参考 ConnectionBag 主流程里的 requite 方法部分
}
以上就是连接回收部分的逻辑,相比其他流程,还是比较简洁的。
十二、ConcurrentBag 主流程
这个类用来存放最终的 PoolEntry 类型的连接对象,提供了基本的增删查的功能,被 HikariPool 持有,上面那么多的操作,几乎都是在 HikariPool 中完成的,HikariPool 用来管理实际的连接生产动作和回收动作,实际操作的却是 ConcurrentBag 类,梳理下上面所有流程的触发点:
主流程 2:初始化 HikariPool 时初始化
ConcurrentBag(构造方法)
,预热时通过createEntry
拿到连接对象,调用ConcurrentBag.add
添加连接到 ConcurrentBag。流程 1.1:通过 HikariPool 获取连接时,通过调用
ConcurrentBag.borrow
拿到一个连接对象。主流程 6:通过
ConcurrentBag.requite
归还一个连接。流程 1.1.2:触发关闭连接时,会通过
ConcurrentBag.remove
移除连接对象,由前面的流程可知关闭连接触发点为:连接超过最大生命周期 maxLifeTime 主动废弃、健康检查不通过主动废弃、连接池缩容。主流程 3:通过异步添加连接时,通过调用
ConcurrentBag.add
添加连接到 ConcurrentBag,由前面的流程可知添加连接触发点为:连接超过最大生命周期 maxLifeTime 主动废弃连接后、连接池扩容。主流程 4:连接池缩容任务,通过调用
ConcurrentBag.values
筛选出需要的做操作的连接对象,然后再通过ConcurrentBag.reserve
完成对连接对象状态的修改,然后会通过流程1.1.2
触发关闭和移除连接操作。
通过触发点整理,可以知道该结构里的主要方法,就是上面触发点里标记为标签色
的部分,然后来具体看下该类的基本定义和主要方法:
public?class?ConcurrentBag<T?extends?IConcurrentBagEntry>?implements?AutoCloseable?{
private?final?CopyOnWriteArrayList<T>?sharedList;?//最终存放 PoolEntry 对象的地方,它是一个 CopyOnWriteArrayList
private?final?boolean?weakThreadLocals;?//默认 false,为 true 时可以让一个连接对象在下方 threadList 里的 list 内处于弱引用状态,防止内存泄漏(参见备注 1)
private?final?ThreadLocal<List<Object>>?threadList;?//线程级的缓存,从 sharedList 拿到的连接对象,会被缓存进当前线程内,borrow 时会先从缓存中拿,从而达到池内无锁实现
private?final?IBagStateListener?listener;?//内部接口,HikariPool 实现了该接口,主要用于 ConcurrentBag 主动通知 HikariPool 触发添加连接对象的异步操作(也就是主流程 3 里的 addConnectionExecutor 所触发的流程)
private?final?AtomicInteger?waiters;?//当前因为获取不到连接而发生阻塞的业务线程数,这个在之前的流程里也出现过,比如主流程 3 里 addBagItem 就会根据该指标进行判断是否需要新增连接
private?volatile?boolean?closed;?//标记当前 ConcurrentBag 是否已被关闭
private?final?SynchronousQueue<T>?handoffQueue;?//这是个即产即销的队列,用于在连接不够用时,及时获取到 add 方法里新创建的连接对象,详情可以参考下面 borrow 和 add 的代码
//内部接口,PoolEntry 类实现了该接口
public?interface?IConcurrentBagEntry?{
//连接对象的状态,前面的流程很多地方都已经涉及到了,比如主流程 4 的缩容
int?STATE_NOT_IN_USE?=?0;?//闲置
int?STATE_IN_USE?=?1;?//使用中
int?STATE_REMOVED?=?-1;?//已废弃
int?STATE_RESERVED?=?-2;?//标记保留,介于闲置和废弃之间的中间状态,主要由缩容那里触发修改
boolean?compareAndSet(int?expectState,?int?newState);?//尝试利用 cas 修改连接对象的状态值
void?setState(int?newState);?//设置状态值
int?getState();?//获取状态值
}
//参考上面 listener 属性的解释
public?interface?IBagStateListener?{
void?addBagItem(int?waiting);
}
//获取连接方法
public?T?borrow(long?timeout,?final?TimeUnit?timeUnit)?{
//?省略...
}
//回收连接方法
public?void?requite(final?T?bagEntry)?{
//省略...
}
//添加连接方法
public?void?add(final?T?bagEntry)?{
//省略...
}
//移除连接方法
public?boolean?remove(final?T?bagEntry)?{
//省略...
}
//根据连接状态值获取当前池子内所有符合条件的连接集合
public?List?values(final?int?state)?{
//省略...
}
//获取当前池子内所有的连接
public?List?values()?{
//省略... 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】
}
//利用 cas 把传入的连接对象的 state 从?STATE_NOT_IN_USE?变为?STATE_RESERVED
public?boolean?reserve(final?T?bagEntry)?{
//省略...
}
//获取当前池子内符合传入状态值的连接数量
public?int?getCount(final?int?state)?{
//省略...
}
}
从这个基本结构就可以稍微看出 HikariCP 是如何优化传统连接池实现的了,相比 Druid 来说,HikariCP 更加偏向无锁实现,尽量避免锁竞争的发生。
12.1:borrow
这个方法用来获取一个可用的连接对象,触发点为流程1.1
,HikariPool 就是利用该方法获取连接的,下面来看下该方法做了什么:
public?T?borrow(long?timeout,?final?TimeUnit?timeUnit)?throws?InterruptedException?{
//?源注释:Try?the?thread-local?list?first
final?List<Object>?list?=?threadList.get();?//首先从当前线程的缓存里拿到之前被缓存进来的连接对象集合
for?(int?i?=?list.size()?-?1;?i?>=?0;?i--)?{
final?Object?entry?=?list.remove(i);?//先移除,回收方法那里会再次 add 进来
final?T?bagEntry?=?weakThreadLocals???((WeakReference<T>)?entry).get()?:?(T)?entry;?//默认不启用弱引用
//?获取到对象后,通过 cas 尝试把其状态从 STATE_NOT_IN_USE?变为?STATE_IN_USE,注意,这里如果其他线程也在使用这个连接对象,
//?并且成功修改属性,那么当前线程的 cas 会失败,那么就会继续循环尝试获取下一个连接对象
if?(bagEntry?!=?null?&&?bagEntry.compareAndSet(STATE_NOT_IN_USE,?STATE_IN_USE))?{
return?bagEntry;?//cas 设置成功后,表示当前线程绕过其他线程干扰,成功获取到该连接对象,直接返回
}
}
//?源注释:Otherwise,?scan?the?shared?list?...?then?poll?the?handoff?queue
final?int?waiting?=?waiters.incrementAndGet();?//如果缓存内找不到一个可用的连接对象,则认为需要“回源”,waiters+1
try?{
for?(T?bagEntry?:?sharedList)?{
//循环 sharedList,尝试把连接状态值从 STATE_NOT_IN_USE?变为?STATE_IN_USE
if?(bagEntry.compareAndSet(STATE_NOT_IN_USE,?STATE_IN_USE))?{
//?源注释:If?we?may?have?stolen?another?waiter's?connection,?request?another?bag?add.
if?(waiting?>?1)?{?//阻塞线程数大于 1 时,需要触发 HikariPool 的 addBagItem 方法来进行添加连接入池,这个方法的实现参考主流程 3
listener.addBagItem(waiting?-?1);
}
return?bagEntry;?//cas 设置成功,跟上面的逻辑一样,表示当前线程绕过其他线程干扰,成功获取到该连接对象,直接返回
}
}
//走到这里说明不光线程缓存里的列表竞争不到连接对象,连 sharedList 里也找不到可用的连接,这时则认为需要通知 HikariPool,该触发添加连接操作了
listener.addBagItem(waiting);
timeout?=?timeUnit.toNanos(timeout);?//这时候开始利用 timeout 控制获取时间
do?{
final?long?start?=?currentTime();
//尝试从 handoffQueue 队列里获取最新被加进来的连接对象(一般新入的连接对象除了加进 sharedList 之外,还会被 offer 进该队列)
final?T?bagEntry?=?handoffQueue.poll(timeout,?NANOSECONDS);
//如果超出指定时间后仍然没有获取到可用的连接对象,或者获取到对象后通过 cas 设置成功,这两种情况都不需要重试,直接返回对象
if?(bagEntry?==?null?||?bagEntry.compareAndSet(STATE_NOT_IN_USE,?STATE_IN_USE))?{
return?bagEntry;
}
//走到这里说明从队列内获取到了连接对象,但是 cas 设置失败,说明又该对象又被其他线程率先拿去用了,若时间还够,则再次尝试获取
timeout?-=?elapsedNanos(start);?//timeout 减去消耗的时间,表示下次循环可用时间
}?while?(timeout?>?10_000);?//剩余时间大于 10s 时才继续进行,一般情况下,这个循环只会走一次,因为 timeout 很少会配的比 10s 还大
return?null;?//超时,仍然返回 null
}?finally?{
waiters.decrementAndGet();?//这一步出去后,HikariPool 收到 borrow 的结果,算是走出阻塞,所以 waiters-1
}
}
仔细看下注释,该过程大致分成三个主要步骤:
从线程缓存获取连接
获取不到再从
sharedList
里获取都获取不到则触发添加连接逻辑,并尝试从队列里获取新生成的连接对象
12.2:add
这个流程会添加一个连接对象进入 bag,通常由主流程3
里的addBagItem
方法通过addConnectionExecutor
异步任务触发添加操作,该方法主流程如下:
public?void?add(final?T?bagEntry)?{
sharedList.add(bagEntry);?//直接加到 sharedList 里去
//?源注释:spin?until?a?thread?takes?it?or?none?are?waiting
//?参考 borrow 流程,当存在线程等待获取可用连接,并且当前新入的这个连接状态仍然是闲置状态,且队列里无消费者等待获取时,发起一次线程调度
while?(waiters.get()?>?0?&&?bagEntry.getState()?==?STATE_NOT_IN_USE?&&?!handoffQueue.offer(bagEntry))?{?//注意这里会 offer 一个连接对象入队列
yield();
}
}
结合borrow
来理解的话,这里在存在等待线程时会添加一个连接对象入队列,可以让borrow
里发生等待的地方更容易 poll 到这个连接对象。
12.3:requite
这个流程会回收一个连接,该方法的触发点在主流程6
,具体代码如下:
public?void?requite(final?T?bagEntry)?{
bagEntry.setState(STATE_NOT_IN_USE);?//回收意味着使用完毕,更改 state 为 STATE_NOT_IN_USE 状态
for?(int?i?=?0;?waiters.get()?>?0;?i++)?{?//如果存在等待线程的话,尝试传给队列,让 borrow 获取
if?(bagEntry.getState()?!=?STATE_NOT_IN_USE?||?handoffQueue.offer(bagEntry))?{
return;
}
else?if?((i?&?0xff)?==?0xff)?{
parkNanos(MICROSECONDS.toNanos(10));
}
else?{
yield();
}
}
final?List<Object>?threadLocalList?=?threadList.get();
if?(threadLocalList.size()?<?50)?{?//线程内连接集合的缓存最多 50 个,这里回收连接时会再次加进当前线程的缓存里,方便下次 borrow 获取
threadLocalList.add(weakThreadLocals???new?WeakReference<>(bagEntry)?:?bagEntry);?//默认不启用弱引用,若启用的话,则缓存集合里的连接对象没有内存泄露的风险
}
}
12.4:remove
这个负责从池子里移除一个连接对象,触发点在流程1.1.2
,代码如下:
public?boolean?remove(final?T?bagEntry)?{
//?下面两个 cas 操作,都是从其他状态变为移除状态,任意一个成功,都不会走到下面的 warn?log
if?(!bagEntry.compareAndSet(STATE_IN_USE,?STATE_REMOVED)?&&?!bagEntry.compareAndSet(STATE_RESERVED,?STATE_REMOVED)?&&?!closed)?{
LOGGER.warn("Attempt?to?remove?an?object?from?the?bag?that?was?not?borrowed?or?reserved:?{}",?bagEntry);
return?false;
}
//?直接从 sharedList 移除掉
final?boolean?removed?=?sharedList.remove(bagEntry);
if?(!removed?&&?!closed)?{
LOGGER.warn("Attempt?to?remove?an?object?from?the?bag?that?does?not?exist:?{}",?bagEntry);
}
return?removed;
评论