写点什么

Alibaba Druid 源码阅读(四) 数据库连接池中连接获取探索

作者:
  • 2021 年 11 月 12 日
  • 本文字数:7124 字

    阅读完需:约 23 分钟

简介

上文中分析了数据库连接池的初始化部分,接下来我们来看看获取连接部分的代码

数据库连接池中连接获取

下面的相关的代码,在代码中加了自己的相关注释:


    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {        // 上文分析过的初始化        init();
// 由于没有配置 filter,直接走:getConnectionDirect if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { // 获取连接,具体后面再看 poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; }
// 下面看意思是做了一些连接有效性的检查 // 无效的就跳过这次,重新去获取一次 if (testOnBorrow) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); }
discardConnection(poolableConnection.holder); continue; } } else { if (poolableConnection.conn.isClosed()) { discardConnection(poolableConnection.holder); // 传入null,避免重复关闭 continue; }
// 这段感觉有段阻塞超时等待的感觉,留待确认 if (testWhileIdle) { final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); long lastActiveTimeMillis = holder.lastActiveTimeMillis; long lastExecTimeMillis = holder.lastExecTimeMillis; long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime && lastExecTimeMillis != lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; }
if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lastKeepTimeMillis; }
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) { timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; }
if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0 // unexcepted branch ) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); }
discardConnection(poolableConnection.holder); continue; } } } }
// 最后的收尾,标识位设置之类的 if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true;
activeConnectionLock.lock(); try { activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } }
if (!this.defaultAutoCommit) { poolableConnection.setAutoCommit(false); }
return poolableConnection; } }
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { // 下面这两段看着是数据库关闭或者不可用触发 // 如果数据库恢复重启,是那段代码去重置这些标识位的,后面去看下 if (closed) { connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); }
if (!enable) { connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) { throw disableException; }
throw new DataSourceDisableException(); }
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false;;) { // 直接生成新的数据库连接 if (createDirect) { createStartNanosUpdater.set(this, System.nanoTime()); if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) { LOG.debug("conn-direct_create "); }
// 下面检测了现有活跃的连接数与配置的最大获取数 // 如果小于则统计增加 // 其他则,有close操作,感觉不应该是关闭当前连接,可能是其他一些可关闭的连接,具体的后面再看看 boolean discard = false; lock.lock(); try { if (activeCount < maxActive) { activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } break; } else { discard = true; } } finally { lock.unlock(); }
if (discard) { JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } }
try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); }
try { // 一些最大等待线程限制? // 下面这两段if没太能理解其含义...... if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); }
if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder(); errorMsg.append("onFatalError, activeCount ") .append(activeCount) .append(", onFatalErrorMaxActive ") .append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) { errorMsg.append(", time '") .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'"); }
if (lastFatalErrorSql != null) { errorMsg.append(", sql \n") .append(lastFatalErrorSql); }
throw new SQLException( errorMsg.toString(), lastFatalError); }
connectCount++;
// 这一点检测后就跳到了直接生成新的物理连接 if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0) { createDirect = true; continue; } }
// 下面的都是从connections里面去去连接 // 但让人疑惑的是在createDirect和其相关的代码中,没有看到放入connection的相关操作 // 后面再确认下这块的获取逻辑 if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); }
// 一些统计设置 if (holder != null) { if (holder.discard) { continue; }
activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this); throw e; } finally { lock.unlock(); }
break; }
// 有点像阻塞等待打印一些信息 if (holder == null) { long waitNanos = waitNanosLocal.get();
final long activeCount; final long maxActive; final long creatingCount; final long createStartNanos; final long createErrorCount; final Throwable createError; try { lock.lock(); activeCount = this.activeCount; maxActive = this.maxActive; creatingCount = this.creatingCount; createStartNanos = this.createStartNanos; createErrorCount = this.createErrorCount; createError = this.createError; } finally { lock.unlock(); }
StringBuilder buf = new StringBuilder(128); buf.append("wait millis ")// .append(waitNanos / (1000 * 1000))// .append(", active ").append(activeCount)// .append(", maxActive ").append(maxActive)// .append(", creating ").append(creatingCount)// ; if (creatingCount > 0 && createStartNanos > 0) { long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000); if (createElapseMillis > 0) { buf.append(", createElapseMillis ").append(createElapseMillis); } }
if (createErrorCount > 0) { buf.append(", createErrorCount ").append(createErrorCount); }
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); for (int i = 0; i < sqlList.size(); ++i) { if (i != 0) { buf.append('\n'); } else { buf.append(", "); } JdbcSqlStatValue sql = sqlList.get(i); buf.append("runningSqlCount ").append(sql.getRunningCount()); buf.append(" : "); buf.append(sql.getSql()); }
String errorMessage = buf.toString();
if (createError != null) { throw new GetConnectionTimeoutException(errorMessage, createError); } else { throw new GetConnectionTimeoutException(errorMessage); } }
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection; }
复制代码

总结

结合以前的阅读,我们得到:


  • 1.初始化的时候,设置了配置的初始化数量的连接

  • 2.获取时从一个数据库连接池中获取

  • 3.没有空闲的连接时,会再次生产新的数据库物理连接


但我们目前没有明确找到生产新的物理数据库连接放入数据的操作,还有如果当前连接数大于配置的最大连接数时具体处理


后面还需要进行探索

发布于: 2 小时前阅读数: 4
用户头像

关注

还未添加个人签名 2018.09.09 加入

代码是门手艺活,也是门艺术活

评论

发布
暂无评论
Alibaba Druid 源码阅读(四) 数据库连接池中连接获取探索