数据库连接池 -Druid 源码学习(四)
- 2022 年 5 月 14 日
本文字数:3870 字
阅读完需:约 13 分钟
1、简介
在本文中,我们将深入了解 com.alibaba.druid.pool.DruidDataSource#init 方法中重要的两个守护线程 CreateConnectionThread 和 DestroyConnectionThread 中的 DestroyConnectionThread 的相关逻辑。
2、环境
os-window10
druid-1.2.8
jdk-1.8.0_312
maven-3.8.1
复制代码
3、DestroyConnectionThread 类的作用
检查连接池中连接的可用性,并将不可用的连接进行丢弃和连接池主动回收机制的底层实现。
4、DestroyConnectionThread 类的执行流程
1、init()方法执行启动该线程
protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
2、执行 DestroyConnectionThread 类的 run()方法,该 run()方法是通过 start()方法启动的线程方法,在该方法内部有调用了 DestroyTask 内部类的 run()方法,该 run()方法虽然也实现了线程类,单没有通过 start()方法启动线程,所以只是一个普通方法。
循环调用 DestroyTask 内部类的 run()方法的周期取决于 timeBetweenEvictionRunsMillis 参数,如果设置了该参数,则执行周期为该参数设置的毫秒数,如果没设置则系统默认 1000 毫秒执行一次。
public void run() {
initedLatch.countDown();
for (;;) {
// 从前面开始删除
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
2.1、shrink(boolean checkTime, boolean keepAlive) 内部执行流程
2.1.1 checkTime 为 true 时的执行逻辑
// 物理连接超时时间检查
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
// 当前连接池中的连接超过设置的物理连接的超时时间则放入丢弃连接数组中
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// 连接空闲时间小于最小空闲时间并且连接空闲时间小于 60 * 1000 * 2 则不处理
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 连接空闲时间大于等于最小空闲时间
if (idleMillis >= minEvictableIdleTimeMillis) {
// 连接池是否存在多于最小连接数的连接,存在则放入丢弃连接数组中
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
// 连接空闲时间大于最大空闲时间则直接放入丢弃连接数组中
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// keepAlive 为 true 并且 空闲时间大于等于 60 * 1000 * 2 则放入活性检查的数组中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
2.1.2 checkTime 为 false 时的执行逻辑
// 连接池是否存在多于最小连接数的连接,存在则放入丢弃连接数组中
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
2.1.3 把 evictConnections 数组中的连接进行物理关闭处理
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
2.1.4 keepAliveConnections 数组中的连接进行可用性验证,不可用的连接进行物理关闭操作
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
3、removeAbandoned() 内部执行流程
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
// 迭代连接池中处于活跃的连接
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
// 如果连接正在运行则不处理
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 活跃时长大于removeAbandonedTimeoutMillis参数时间则放入到废弃集合中
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
// 连接为禁用状态则不处理
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
// 关闭物理连接
JdbcUtils.close(pooledConnection);
// 标记连接为废弃状态
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
// 是否打印连接池泄露异常
if (isLogAbandoned()) {
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, owner thread: ");
buf.append(pooledConnection.getOwnerThread().getName());
buf.append(", connected at : ");
buf.append(pooledConnection.getConnectedTimeMillis());
buf.append(", open stackTrace\n");
StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
+ ", current stackTrace\n");
trace = pooledConnection.getOwnerThread().getStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
LOG.error(buf.toString());
}
}
}
return removeCount;
}
wjchenge
还未添加个人签名 2018.07.27 加入
还未添加个人简介
评论