写点什么

面试官问我:看过 sharding-jdbc 的源码吗?我吧啦吧啦说了一通!!

用户头像
冰河
关注
发布于: 2020 年 09 月 10 日
面试官问我:看过sharding-jdbc的源码吗?我吧啦吧啦说了一通!!

写在前面


在产品初期快速迭代的过程中,往往为了快速上线而占据市场,在后端开发的过程中往往不会过多的考虑分布式和微服务,往往会将后端服务做成一个单体应用,而数据库也是一样,最初会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速发展,将所有数据都放在一个数据库中已经不足以支撑业务发展的需要。此时,就会对系统进行分布式改造,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的访问和管理拆分后的数据库呢?业界已经有很多成熟的解决方案,其中,一个非常优秀的解决方案就是:Apache ShardingSphere。今天,我们就从源码级别来共同探讨下 sharding-jdbc 的核心源码。


sharding-jdbc 经典用法


Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。例如下面的代码片段。


try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(    createDataSourceMap(), shardingRuleConfig, new Properties()) {    Connection connection = dataSource.getConnection();    ...}
复制代码


我们在程序中拿到 Connection 对象后,就可以像使用普通的 JDBC 一样来使用 sharding-jdbc 操作数据库了。


sharding-jdbc 包结构


sharding-jdbc      ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象    └── sharding-jdbc-orchestration        配置中心sharding-core    ├── sharding-core-api       接口和配置类	    ├── sharding-core-common    通用分片策略实现...    ├── sharding-core-entry     SQL解析、路由、改写,核心类BaseShardingEngine    ├── sharding-core-route     SQL路由,核心类StatementRoutingEngine    ├── sharding-core-rewrite   SQL改写,核心类ShardingSQLRewriteEngine    ├── sharding-core-execute   SQL执行,核心类ShardingExecuteEngine    └── sharding-core-merge     结果合并,核心类MergeEngineshardingsphere-sql-parser     ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser    ├── shardingsphere-sql-parser-engine    SQL解析,核心类SQLParseEngine    ├── shardingsphere-sql-parser-relation    └── shardingsphere-sql-parser-mysql     MySQL解析器,核心类MySQLParserEntry和MySQLParsershardingsphere-underlying           基础接口和api    ├── shardingsphere-rewrite      SQLRewriteEngine接口    ├── shardingsphere-execute      QueryResult查询结果    └── shardingsphere-merge        MergeEngine接口shardingsphere-spi                  SPI加载工具类sharding-transaction    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载		    ├── sharding-transaction-2pc    实现类XAShardingTransactionManager    └── sharding-transaction-base   实现类SeataATShardingTransactionManager
复制代码


sharding-jdbc 中的四大对象


所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。


public static DataSource createDataSource(        final Map<String, DataSource> dataSourceMap,        final ShardingRuleConfiguration shardingRuleConfig,        final Properties props) throws SQLException {    return new ShardingDataSource(dataSourceMap,               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);}
复制代码


说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。


DataSource


这里,涉及到两个比较重要的接口,一个是 DataSource,一个是 Connection。我们首先来看下它们的类图。


  • DataSource


  • Connection


DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。


ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。


private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(                dataSourceMap, shardingRule, props, getDatabaseType());@Overridepublic final ShardingConnection getConnection() {    return new ShardingConnection(getDataSourceMap(), runtimeContext,            TransactionTypeHolder.get());}
复制代码


Connection


ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式,如下代码所示。


@Overridepublic Statement createStatement(final int resultSetType,        final int resultSetConcurrency, final int resultSetHoldability) {    return new ShardingStatement(this, resultSetType,            resultSetConcurrency, resultSetHoldability);}
@Overridepublic PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);}
复制代码


说明: ShardingConnection 主要是将创建 ShardingStatement 和 ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能。


Statement


Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。我们来看下 Statement 的类图



对于 Statement,我就不做过对的描述了,相信使用过 JDBC 的小伙伴,对 Statement 都不陌生了。


ResultSet


ResultSet 类图如下所示。



我们从源码中可以看出:ShardingResultSet 只是对 MergedResult 的简单封装。


private final MergedResult mergeResultSet;@Overridepublic boolean next() throws SQLException {    return mergeResultSet.next();}
复制代码


sharding-jdbc-core 核心分析


ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。


ShardingStatement


初始化


private final ShardingConnection connection;private final StatementExecutor statementExecutor;
public ShardingStatement(final ShardingConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);}
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);}
复制代码


ShardingStatement 内部执行 SQL 委托给了 statementExecutor。


执行


(1)executeQuery 执行过程


@Overridepublic ResultSet executeQuery(final String sql) throws SQLException {    ResultSet result;    try {        clearPrevious();        // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult        shard(sql);        // 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit        initStatementExecutor();        // 3. statementExecutor.executeQuery() 执行任务        MergeEngine mergeEngine = MergeEngineFactory.newInstance(                connection.getRuntimeContext().getDatabaseType(),                connection.getRuntimeContext().getRule(), sqlRouteResult,                connection.getRuntimeContext().getMetaData().getRelationMetas(),                statementExecutor.executeQuery());        // 4. 结果合并        result = getResultSet(mergeEngine);    } finally {        currentResultSet = null;    }    currentResultSet = result;    return result;}
复制代码


(2)SQL 路由(包括 SQL 解析、路由、改写)


private SQLRouteResult sqlRouteResult;private void shard(final String sql) {    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(            runtimeContext.getRule(), runtimeContext.getProps(),            runtimeContext.getMetaData(), runtimeContext.getParseEngine());    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());}
复制代码


SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。


StatementExecutor


StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。


类结构


重要属性


AbstractStatementExecutor 类中重要的属性:


// SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnitprivate final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;// 保存生成的执行计划StatementExecuteUnitprivate final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =            new LinkedList<>();
// SQLExecuteTemplate用于执行StatementExecuteUnitprivate final SQLExecuteTemplate sqlExecuteTemplate;// 保存查询结果private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
复制代码


生成执行计划


// 执行前清理状态private void clearPrevious() throws SQLException {    statementExecutor.clear();}// 执行时初始化private void initStatementExecutor() throws SQLException {    statementExecutor.init(sqlRouteResult);    replayMethodForStatements();}
复制代码


这里,需要注意的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。


statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。


public void init(final SQLRouteResult routeResult) throws SQLException {    setSqlStatementContext(routeResult.getSqlStatementContext());    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));    cacheStatements();}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups( final Collection<RouteUnit> routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups( routeUnits, new SQLExecutePrepareCallback() { // 获取连接 @Override public List<Connection> getConnections( final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return StatementExecutor.super.getConnection().getConnections( connectionMode, dataSourceName, connectionSize); }
// 生成执行计划RouteUnit -> StatementExecuteUnit @Override public StatementExecuteUnit createStatementExecuteUnit( final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit( routeUnit, connection.createStatement( getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), connectionMode); } });}
复制代码


SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。


任务执行


public List<QueryResult> executeQuery() throws SQLException {    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();    SQLExecuteCallback<QueryResult> executeCallback =         new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {        @Override        protected QueryResult executeSQL(final String sql, final Statement statement,                final ConnectionMode connectionMode) throws SQLException {            return getQueryResult(sql, statement, connectionMode);        }    };    // 执行StatementExecuteUnit    return executeCallback(executeCallback);}
// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)protected final <T> List<T> executeCallback( final SQLExecuteCallback<T> executeCallback) throws SQLException { // 执行所有的任务 StatementExecuteUnit List<T> result = sqlExecuteTemplate.executeGroup( (Collection) executeGroups, executeCallback); refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext); return result;}
复制代码


SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。


private QueryResult getQueryResult(final String sql, final Statement statement,        final ConnectionMode connectionMode) throws SQLException {    ResultSet resultSet = statement.executeQuery(sql);    getResultSets().add(resultSet);    return ConnectionMode.MEMORY_STRICTLY == connectionMode            ? new StreamQueryResult(resultSet)            : new MemoryQueryResult(resultSet);}
复制代码


ConnectionMode 有两种模式:内存限制(MEMORYSTRICTLY)和连接限制(CONNECTIONSTRICTLY),如果一个连接执行多个 StatementExecuteUnit 则为内存限制(MEMORYSTRICTLY),采用流式处理,即 StreamQueryResult ,反之则为连接限制(CONNECTIONSTRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆。


重磅福利


关注「 冰河技术 」微信公众号,后台回复 “设计模式” 关键字领取《深入浅出 Java 23 种设计模式》PDF 文档。回复“Java8”关键字领取《Java8 新特性教程》PDF 文档。回复“限流”关键字获取《亿级流量下的分布式限流解决方案》PDF 文档,三本 PDF 均是由冰河原创并整理的超硬核教程,面试必备!!


<font color="#FF0000">好了,今天就聊到这儿吧!别忘了点个赞,给个在看和转发,让更多的人看到,一起学习,一起进步!!</font>


写在最后


如果你觉得冰河写的还不错,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者已经通过阅读「 冰河技术 」微信公众号文章,吊打面试官,成功跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样提升自己的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何提升技术能力不再迷茫!



发布于: 2020 年 09 月 10 日阅读数: 62
用户头像

冰河

关注

公众号:冰河技术 2020.05.29 加入

Mykit系列开源框架发起者、核心架构师和开发者,《海量数据处理与大数据技术实战》与《MySQL开发、优化与运维实战》作者。【冰河技术】微信公众号作者。

评论

发布
暂无评论
面试官问我:看过sharding-jdbc的源码吗?我吧啦吧啦说了一通!!