Alibaba Druid 源码阅读(四) 数据库连接池中连接获取探索
 作者:萧
- 2021 年 11 月 12 日
- 本文字数:7124 字 - 阅读完需:约 23 分钟 
简介
上文中分析了数据库连接池的初始化部分,接下来我们来看看获取连接部分的代码
数据库连接池中连接获取
下面的相关的代码,在代码中加了自己的相关注释:
    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {        // 上文分析过的初始化        init();
        // 由于没有配置 filter,直接走:getConnectionDirect        if (filters.size() > 0) {            FilterChainImpl filterChain = new FilterChainImpl(this);            return filterChain.dataSource_connect(this, maxWaitMillis);        } else {            return getConnectionDirect(maxWaitMillis);        }    }
    public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {        int notFullTimeoutRetryCnt = 0;        for (;;) {            // handle notFullTimeoutRetry            DruidPooledConnection poolableConnection;            try {                // 获取连接,具体后面再看                poolableConnection = getConnectionInternal(maxWaitMillis);            } catch (GetConnectionTimeoutException ex) {                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {                    notFullTimeoutRetryCnt++;                    if (LOG.isWarnEnabled()) {                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);                    }                    continue;                }                throw ex;            }
            // 下面看意思是做了一些连接有效性的检查            // 无效的就跳过这次,重新去获取一次            if (testOnBorrow) {                boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);                if (!validate) {                    if (LOG.isDebugEnabled()) {                        LOG.debug("skip not validate connection.");                    }
                    discardConnection(poolableConnection.holder);                    continue;                }            } else {                if (poolableConnection.conn.isClosed()) {                    discardConnection(poolableConnection.holder); // 传入null,避免重复关闭                    continue;                }
                // 这段感觉有段阻塞超时等待的感觉,留待确认                if (testWhileIdle) {                    final DruidConnectionHolder holder = poolableConnection.holder;                    long currentTimeMillis             = System.currentTimeMillis();                    long lastActiveTimeMillis          = holder.lastActiveTimeMillis;                    long lastExecTimeMillis            = holder.lastExecTimeMillis;                    long lastKeepTimeMillis            = holder.lastKeepTimeMillis;
                    if (checkExecuteTime                            && lastExecTimeMillis != lastActiveTimeMillis) {                        lastActiveTimeMillis = lastExecTimeMillis;                    }
                    if (lastKeepTimeMillis > lastActiveTimeMillis) {                        lastActiveTimeMillis = lastKeepTimeMillis;                    }
                    long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
                    if (timeBetweenEvictionRunsMillis <= 0) {                        timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;                    }
                    if (idleMillis >= timeBetweenEvictionRunsMillis                            || idleMillis < 0 // unexcepted branch                            ) {                        boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);                        if (!validate) {                            if (LOG.isDebugEnabled()) {                                LOG.debug("skip not validate connection.");                            }
                            discardConnection(poolableConnection.holder);                             continue;                        }                    }                }            }
            // 最后的收尾,标识位设置之类的            if (removeAbandoned) {                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();                poolableConnection.connectStackTrace = stackTrace;                poolableConnection.setConnectedTimeNano();                poolableConnection.traceEnable = true;
                activeConnectionLock.lock();                try {                    activeConnections.put(poolableConnection, PRESENT);                } finally {                    activeConnectionLock.unlock();                }            }
            if (!this.defaultAutoCommit) {                poolableConnection.setAutoCommit(false);            }
            return poolableConnection;        }    }
    private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {        // 下面这两段看着是数据库关闭或者不可用触发        // 如果数据库恢复重启,是那段代码去重置这些标识位的,后面去看下        if (closed) {            connectErrorCountUpdater.incrementAndGet(this);            throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));        }
        if (!enable) {            connectErrorCountUpdater.incrementAndGet(this);
            if (disableException != null) {                throw disableException;            }
            throw new DataSourceDisableException();        }
        final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);        final int maxWaitThreadCount = this.maxWaitThreadCount;
        DruidConnectionHolder holder;
        for (boolean createDirect = false;;) {            // 直接生成新的数据库连接            if (createDirect) {                createStartNanosUpdater.set(this, System.nanoTime());                if (creatingCountUpdater.compareAndSet(this, 0, 1)) {                    PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();                    holder = new DruidConnectionHolder(this, pyConnInfo);                    holder.lastActiveTimeMillis = System.currentTimeMillis();
                    creatingCountUpdater.decrementAndGet(this);                    directCreateCountUpdater.incrementAndGet(this);
                    if (LOG.isDebugEnabled()) {                        LOG.debug("conn-direct_create ");                    }
                    // 下面检测了现有活跃的连接数与配置的最大获取数                    // 如果小于则统计增加                    // 其他则,有close操作,感觉不应该是关闭当前连接,可能是其他一些可关闭的连接,具体的后面再看看                    boolean discard = false;                    lock.lock();                    try {                        if (activeCount < maxActive) {                            activeCount++;                            holder.active = true;                            if (activeCount > activePeak) {                                activePeak = activeCount;                                activePeakTime = System.currentTimeMillis();                            }                            break;                        } else {                            discard = true;                        }                    } finally {                        lock.unlock();                    }
                    if (discard) {                        JdbcUtils.close(pyConnInfo.getPhysicalConnection());                    }                }            }
            try {                lock.lockInterruptibly();            } catch (InterruptedException e) {                connectErrorCountUpdater.incrementAndGet(this);                throw new SQLException("interrupt", e);            }
            try {                // 一些最大等待线程限制?                // 下面这两段if没太能理解其含义......                if (maxWaitThreadCount > 0                        && notEmptyWaitThreadCount >= maxWaitThreadCount) {                    connectErrorCountUpdater.incrementAndGet(this);                    throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "                            + lock.getQueueLength());                }
                if (onFatalError                        && onFatalErrorMaxActive > 0                        && activeCount >= onFatalErrorMaxActive) {                    connectErrorCountUpdater.incrementAndGet(this);
                    StringBuilder errorMsg = new StringBuilder();                    errorMsg.append("onFatalError, activeCount ")                            .append(activeCount)                            .append(", onFatalErrorMaxActive ")                            .append(onFatalErrorMaxActive);
                    if (lastFatalErrorTimeMillis > 0) {                        errorMsg.append(", time '")                                .append(StringUtils.formatDateTime19(                                        lastFatalErrorTimeMillis, TimeZone.getDefault()))                                .append("'");                    }
                    if (lastFatalErrorSql != null) {                        errorMsg.append(", sql \n")                                .append(lastFatalErrorSql);                    }
                    throw new SQLException(                            errorMsg.toString(), lastFatalError);                }
                connectCount++;
                // 这一点检测后就跳到了直接生成新的物理连接                if (createScheduler != null                        && poolingCount == 0                        && activeCount < maxActive                        && creatingCountUpdater.get(this) == 0                        && createScheduler instanceof ScheduledThreadPoolExecutor) {                    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;                    if (executor.getQueue().size() > 0) {                        createDirect = true;                        continue;                    }                }
                // 下面的都是从connections里面去去连接                // 但让人疑惑的是在createDirect和其相关的代码中,没有看到放入connection的相关操作                // 后面再确认下这块的获取逻辑                if (maxWait > 0) {                    holder = pollLast(nanos);                } else {                    holder = takeLast();                }
                // 一些统计设置                if (holder != null) {                    if (holder.discard) {                        continue;                    }
                    activeCount++;                    holder.active = true;                    if (activeCount > activePeak) {                        activePeak = activeCount;                        activePeakTime = System.currentTimeMillis();                    }                }            } catch (InterruptedException e) {                connectErrorCountUpdater.incrementAndGet(this);                throw new SQLException(e.getMessage(), e);            } catch (SQLException e) {                connectErrorCountUpdater.incrementAndGet(this);                throw e;            } finally {                lock.unlock();            }
            break;        }
        // 有点像阻塞等待打印一些信息        if (holder == null) {            long waitNanos = waitNanosLocal.get();
            final long activeCount;            final long maxActive;            final long creatingCount;            final long createStartNanos;            final long createErrorCount;            final Throwable createError;            try {                lock.lock();                activeCount = this.activeCount;                maxActive = this.maxActive;                creatingCount = this.creatingCount;                createStartNanos = this.createStartNanos;                createErrorCount = this.createErrorCount;                createError = this.createError;            } finally {                lock.unlock();            }
            StringBuilder buf = new StringBuilder(128);            buf.append("wait millis ")//               .append(waitNanos / (1000 * 1000))//               .append(", active ").append(activeCount)//               .append(", maxActive ").append(maxActive)//               .append(", creating ").append(creatingCount)//            ;            if (creatingCount > 0 && createStartNanos > 0) {                long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);                if (createElapseMillis > 0) {                    buf.append(", createElapseMillis ").append(createElapseMillis);                }            }
            if (createErrorCount > 0) {                buf.append(", createErrorCount ").append(createErrorCount);            }
            List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();            for (int i = 0; i < sqlList.size(); ++i) {                if (i != 0) {                    buf.append('\n');                } else {                    buf.append(", ");                }                JdbcSqlStatValue sql = sqlList.get(i);                buf.append("runningSqlCount ").append(sql.getRunningCount());                buf.append(" : ");                buf.append(sql.getSql());            }
            String errorMessage = buf.toString();
            if (createError != null) {                throw new GetConnectionTimeoutException(errorMessage, createError);            } else {                throw new GetConnectionTimeoutException(errorMessage);            }        }
        holder.incrementUseCount();
        DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);        return poolalbeConnection;    }
复制代码
 总结
结合以前的阅读,我们得到:
- 1.初始化的时候,设置了配置的初始化数量的连接 
- 2.获取时从一个数据库连接池中获取 
- 3.没有空闲的连接时,会再次生产新的数据库物理连接 
但我们目前没有明确找到生产新的物理数据库连接放入数据的操作,还有如果当前连接数大于配置的最大连接数时具体处理
后面还需要进行探索
划线
评论
复制
发布于: 2 小时前阅读数: 4
版权声明: 本文为 InfoQ 作者【萧】的原创文章。
原文链接:【http://xie.infoq.cn/article/b1dd35829bb6b7c6bd1041fe5】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。

萧
关注
还未添加个人签名 2018.09.09 加入
代码是门手艺活,也是门艺术活











 
    
评论