这篇文章尝试回答一下秦老师提出的问题,druid 如何实现断链重连的?
解决思路:
1、反向查找 java.sql.Connection#close, 看看哪些地方调用了?
2、地方还是挺多,这先看看哪个地方调用 com.alibaba.druid.util.JdbcUtils#close(java.sql.Connection)
3、再往下追,其实调用的地方依然非常多,秦老师今天讲了一个参数 com.alibaba.druid.pool.DruidAbstractDataSource#phyMaxUseCount,当查询次数超过一定数量的时候,关闭连接。所以我跟着这个参数往下找。找到了这个方法:
com.alibaba.druid.pool.DruidDataSource#discardConnection(com.alibaba.druid.pool.DruidConnectionHolder)
4、虽然 discardConnection 不能覆盖全部的 Connection#close,但是我觉得老师问的断链重连,应该就是 discardConnection 这个地方。反向查找出所有调用 discardConnection 的地方,搜索结果如下:
com.alibaba.druid.pool.DruidDataSource#getConnectionDirect 这个地方调用了三次。
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
复制代码
com.alibaba.druid.pool.DruidDataSource#handleFatalError 调用了一次。
if (requireDiscard) {
if (holder.statementTrace != null) {
holder.lock.lock();
try {
for (Statement stmt : holder.statementTrace) {
JdbcUtils.close(stmt);
}
} finally {
holder.lock.unlock();
}
}
this.discardConnection(holder);
}
复制代码
com.alibaba.druid.pool.DruidDataSource#recycle 调用的地方比较多,总共有四个地方。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}
if (logDifferentThread //
&& (!isAsyncCloseConnectionEnable()) //
&& pooledConnection.ownerThread != Thread.currentThread()//
) {
LOG.warn("get/close not same thread");
}
final Connection physicalConnection = holder.conn;
if (pooledConnection.traceEnable) {
Object oldInfo = null;
activeConnectionLock.lock();
try {
if (pooledConnection.traceEnable) {
oldInfo = activeConnections.remove(pooledConnection);
pooledConnection.traceEnable = false;
}
} finally {
activeConnectionLock.unlock();
}
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
}
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// check need to rollback?
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
if (holder.discard) {
return;
}
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder);
return;
}
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
if (holder.initSchema != null) {
holder.conn.setSchema(holder.initSchema);
holder.initSchema = null;
}
if (!enable) {
discardConnection(holder);
return;
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder);
return;
}
}
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
holder.clearStatementCache();
if (!holder.discard) {
discardConnection(holder);
holder.discard = true;
}
LOG.error("recyle error", e);
recycleErrorCountUpdater.incrementAndGet(this);
}
}
复制代码
下面分别看下调用的这几个地方,代表什么逻辑。
1、getConnectionDirect 的时候。
通过翻代码,获取连接的时候调用的三个 discardConnection 地方,分别是:
从连接池获取连接,如果 testOnBorrow 开启,借取连接的时候测试连接是否有效,如果连接无效的话,关闭连接重连。
获取连接时,直接根据连接的属性判断,如果连接是一个关闭的连接,走 discardConnection 逻辑。
如果开启了 testWhileIdle,获取连接时,如果连接时个 idle 时间比较长的连接,也会进行测试,如果连接无效的话,关闭连接重连。
2、handleFatalError 这个方法,看名字,很像连接报错了,大体逻辑如下:
3、recycle 归还连接的时候,
phyMaxUseCount 也就是连接使用次数超了,释放连接
enable 为 false 的时候,释放连接,看了下代码 enable 为 false 一般是连接池要关了
连接的时长超过了 phyTimeoutMillis 时长,释放连接
归还失败(报错,回收时发现数量超了)的时候,释放连接
综上所述,druid 代码断线重连的地方大概是这些。
评论