Druid 连接池源码阅读 04
作者:石小天
- 2022 年 5 月 14 日
本文字数:4305 字
阅读完需:约 14 分钟
getConnectionInternal 方法
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
// createDirect只是针对createScheduler(创建连接的线程池)进行处理
for (boolean createDirect = false;;) {
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("conn-direct_create ");
}
boolean discard = false;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
// 如果等待创建连接的线程数如果大于maxWaitThreadCount,抛出异常,这个notEmptyWaitThreadCount是在pollLast(nanos)和takeLast()中设置
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {
errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
}
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
if (holder == null) {
long waitNanos = waitNanosLocal.get();
final long activeCount;
final long maxActive;
final long creatingCount;
final long createStartNanos;
final long createErrorCount;
final Throwable createError;
try {
lock.lock();
activeCount = this.activeCount;
maxActive = this.maxActive;
creatingCount = this.creatingCount;
createStartNanos = this.createStartNanos;
createErrorCount = this.createErrorCount;
createError = this.createError;
} finally {
lock.unlock();
}
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active ").append(activeCount)//
.append(", maxActive ").append(maxActive)//
.append(", creating ").append(creatingCount)//
;
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}
if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('\n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
复制代码
主要是控制了创建连接的线程数量,以及处理异常,相关阅读的地方加了注释。方法的主要的逻辑由 pollLast(nanos)或者 takeLast()完成,后续分析。
划线
评论
复制
发布于: 刚刚阅读数: 3
石小天
关注
还未添加个人签名 2018.11.07 加入
还未添加个人简介
评论