写点什么

Sharding-JDBC 源码解析与 vivo 的定制开发

  • 2024-03-07
    广东
  • 本文字数:37704 字

    阅读完需:约 124 分钟

作者:vivo IT 平台团队 - Xiong Huanxin


Sharding-JDBC 是在 JDBC 层提供服务的数据库中间件,在分库分表场景具有广泛应用。本文对 Sharding-JDBC 的解析、路由、改写、执行、归并五大核心引擎进行了源码解析,并结合业务实践经验,总结了使用 Sharding-JDBC 的一些痛点问题并分享了对应的定制开发与改造方案。

本文源码基于 Sharding-JDBC 4.1.1 版本。

一、业务背景

随着业务并发请求和数据规模的不断扩大,单节点库表压力往往会成为系统的性能瓶颈。公司 IT 内部营销库存、交易订单、财经台账、考勤记录等多领域的业务场景的日增数据量巨大,存在着数据库节点压力过大、连接过多、查询速度变慢等情况,根据数据来源、时间、工号等信息来将没有联系的数据尽量均分到不同的库表中,从而在不影响业务需求的前提下,减轻数据库节点压力,提升查询效率和系统稳定性

二、技术选型

我们对比了几款比较常见的支持分库分表和读写分离的中间件。


Sharding-JDBC 作为轻量化的增强版的 JDBC 框架,相较其他中间件性能更好,接入难度更低,其数据分片、读写分离功能也覆盖了我们的业务诉求,因此我们在业务中广泛使用了 Sharding-JDBC。但在使用 Sharding-JDBC 的过程中,我们也发现了诸多问题,为了业务更便捷的使用 Sharding-JDBC,我们对源码做了针对性的定制开发和组件封装来满足业务需求。

三、源码解析

3.1 引言

Sharding-JDBC 作为基于 JDBC 的数据库中间件,实现了 JDBC 的标准 api,Sharding-JDBC 与原生 JDBC 的执行对比流程如下图所示:


相关执行流程的代码样例如下:

  • JDBC 执行样例

//获取数据库连接try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {    String sql = "SELECT * FROM  t_user WHERE name = ?";    //预编译SQL    try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {        //参数设置与执行        preparedStatement.setString(1, "vivo");        preparedStatement.execute(sql);        //获取结果集        try (ResultSet resultSet = preparedStatement.getResultSet()) {            while (resultSet.next()) {                //处理结果            }        }    }}
复制代码
  • Sharding-JDBC 源码

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute    public boolean execute() throws SQLException {        try {
复制代码


从对比的执行流程图可见:

  • 【JDBC】:执行的主要流程是通过 Datasource 获取 Connection,再注入 SQL 语句生成 PreparedStatement 对象,PreparedStatement 设置占位符参数执行后得到结果集 ResultSet。

  • 【Sharding-JDBC】:主要流程基本一致,但 Sharding 基于 PreparedStatement 进行了实现与扩展,具体实现类 ShardingPreparedStatement 中会抽象出解析、路由、重写、归并等引擎,从而实现分库分表、读写分离等能力,每个引擎的作用说明如下表所示:

//*相关引擎的源码解析在下文会作更深入的阐述

3.2 解析引擎

3.2.1 引擎解析

解析引擎是 Sharding-JDBC 进行分库分表逻辑的基础,其作用是将 SQL 拆解为不可再分的原子符号(称为 token),再根据数据库类型将这些 token 分类成关键字、表达式、操作符、字面量等不同类型,进而生成抽象语法树,而语法树是后续进行路由、改写操作的前提(这也正是语法树的存在使得 Sharding-JDBC 存在各式各样的语法限制的原因之一)。

▲图片来源:ShardingSphere 官方文档


4.x 的版本采用 ANTLR(ANother Tool for Language Recognition)作为解析引擎,在 ShardingSphere-sql-parser-dialect 模块中定义了适用于不同数据库语法的解析规则(.g4 文件),idea 中也可以下载 ANTLR v4 的插件,输入 SQL 查看解析后的语法树结果。


解析方法的入口在 DataNodeRouter 的 createRouteContext 方法中,解析引擎根据数据库类型和 SQL 创建 SQLParserExecutor 执行得到解析树,再通过 ParseTreeVisitor()的 visit 方法,对解析树进行处理得到 SQLStatement。ANTLR 支持 listener 和 visitor 两种模式的接口,visitor 方式可以更灵活的控制解析树的遍历过程,更适用于 SQL 解析的场景。

  • 解析引擎核心代码

org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {        //解析引擎解析SQL
复制代码

SQLStatement 实际上是一个接口,其实现对应着不同的 SQL 类型,如 SelectStatement 类中就包括查询的字段、表名、where 条件、分组、排序、分页、lock 等变量,可以看到这里并没有对 having 这种字段做定义,相当于 Sharding-JDBC 无法识别到 SQL 中的 having,这使得 Sharding-JDBC 对 having 语法有一定的限制。

  • SelectStatement

public final class SelectStatement extends DMLStatement {    // 字段    private ProjectionsSegment projections;    // 表    private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>();    // where    private WhereSegment where;    // groupBy    private GroupBySegment groupBy;    // orderBy    private OrderBySegment orderBy;    // limit    private LimitSegment limit;    // 父statement    private SelectStatement parentStatement;    // lock    private LockSegment lock;}
复制代码

SQLStatement 还会被进一步转换成 SQLStatementContext,如 SelectStatement 会被转换成 SelectStatementContext ,其结构与 SelectStatement 类似不再多说,值得注意的是虽然这里定义了 containsSubquery 来判断是否包含子查询,但 4.1.1 源码永远是返回的 false,与 having 类似,这意味着 Sharding-JDBC 不会对子查询语句做特殊处理。


  • SelectStatementContext

public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable {         private final TablesContext tablesContext;         private final ProjectionsContext projectionsContext;         private final GroupByContext groupByContext;         private final OrderByContext orderByContext;         private final PaginationContext paginationContext;         private final boolean containsSubquery;}     private boolean containsSubquery() {        // FIXME process subquery//        Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);//        for (SubqueryPredicateSegment each : subqueryPredicateSegments) {//            if (!each.getAndPredicates().isEmpty()) {//                return true;//            }//        }        return false;    }
复制代码

3.2.2 引擎总结

解析引擎是进行路由改写的前提基础,其作用就是将 SQL 按照定义的语法规则拆分成原子符号(token),生成语法树,根据不同的 SQL 类型生成对应的 SQLStatement,SQLStatement 由各自的 Segment 组成,所有的 Segment 都包含 startIndex 和 endIndex 来定位 token 在 SQL 中所属的位置,但解析语法难以涵盖所有的 SQL 场景,使得部分 SQL 无法按照预期的结果路由执行。

3.3 路由引擎

3.3.1 引擎解析

路由引擎是 Sharding-JDBC 的核心步骤,作用是根据定义的分库分表规则将解析引擎生成的 SQL 上下文生成对应的路由结果,RouteResult 包括 DataNode 和 RouteUnit,DataNode 是实际的数据源节点,包括数据源名称和实际的物理表名,RouteUnit 则记录了逻辑表/库与物理表/库的映射关系,后面的改写引擎也是根据这个映射关系来决定如何替换 SQL 中的逻辑表(实际上 RouteResult 就是维护了一条 SQL 需要往哪些库哪些表执行的关系)。

  • RouteResult

public final class RouteResult {         private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();         private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();} public final class DataNode {         private static final String DELIMITER = ".";         private final String dataSourceName;         private final String tableName;} public final class RouteUnit {         private final RouteMapper dataSourceMapper;         private final Collection<RouteMapper> tableMappers;} public final class RouteMapper {         private final String logicName;         private final String actualName;}
复制代码


其中,路由有分为分片路由主从路由,两者可以单独使用,也可以组合使用。


  • 分片路由

ShardingRouteDecorator 的 decorate 方法是路由引擎的核心逻辑,经过 SQL 校验->生成分片条件->合并分片值后得到路由结果。

  • 分片路由 decorate 方法

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();        List<Object> parameters = routeContext.getParameters();        //SQL校验  校验INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE语句中是否存在分片键      ShardingStatementValidatorFactory.newInstance(                sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));        //生成分片条件        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);        //合并分片值        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {            checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);            mergeShardingConditions(shardingConditions);        }        ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);        //得到路由结果        RouteResult routeResult = shardingRouteEngine.route(shardingRule);        if (needMergeShardingValues) {            Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");        }        return new RouteContext(sqlStatementContext, parameters, routeResult);    }
复制代码

ShardingStatementValidator 有 ShardingInsertStatementValidator 和 ShardingUpdateStatementValidator 两种实现,INSERT INTO .... ON DUPLICATE KEY UPDATE 和 UPDATE 语法都会涉及到字段值的更新,Sharding-JDBC 是不允许更新分片值的,毕竟修改分片值还需要将数据迁移至新分片值对应的库表中,才能保证数据分片规则一致。两者的校验细节也有所不同:

  • INSERT INTO .... ON DUPLICATE KEY UPDATE 仅仅是对 UPDATE 字段的校验, ON DUPLICATE KEY UPDATE 中包含分片键就会报错;

  • 而 UPDATE 语句则会额外校验 WHERE 条件中分片键的原始值和 SET 的值是否一样,不一样则会抛出异常。


ShardingCondition 中只有一个变量 routeValues,RouteValue 是一个接口,有 ListRouteValue 和 RangeRouteValue 两种实现,前者记录了分片键的 in 或=条件的分片值,后者则记录了范围查询的分片值,两者被封装为 ShardingValue 对象后,将会透传至分片算法中计算得到分片结果集。

  • ShardingCondition

public final class ShardingConditions {         private final List<ShardingCondition> conditions;} public class ShardingCondition {         private final List<RouteValue> routeValues = new LinkedList<>();}  public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {         private final String columnName;         private final String tableName;    //in或=条件对应的值    private final Collection<T> values;         @Override    public String toString() {        return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");    }} public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue {         private final String columnName;         private final String tableName;    //between and 大于小于等范围值的上下限    private final Range<T> valueRange;}
复制代码

生成分片条件后还会合并分片条件,但是前文提过在 SelectStatementContext 中的 containsSubquery 永远是 false,所以这段逻辑永远返回 false,即不会合并分片条件。

  • 判断是否需要合并分片条件

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {        return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery()                && !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty();    }
复制代码

然后就是通过分片路由引擎调用分片算法计算路由结果了,ShardingRouteEngine 实现较多,介绍起来篇幅较多,这里就不展开说明了,可以参考官方文档来了解路由引擎的选择规则

▲图片来源:ShardingSphere 官方文档


Sharding-JDBC 定义了多种分片策略和算法接口,主要的分配策略与算法说明如下表所示:


补充两个细节:

(1)当 ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING 配置设置 true 时,InlineShardingStrategy 支持范围查询,但是并不是根据分片值计算范围,而是直接全路由至配置的数据节点,会存在性能隐患。

  • InlineShardingStrategy.doSharding

org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {        RouteValue shardingValue = shardingValues.iterator().next();        //ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING设置为true,直接返回availableTargetNames,而不是根据RangeRouteValue计算        if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) {            return availableTargetNames;        }        Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString());        Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);        for (String each : shardingResult) {            if (availableTargetNames.contains(each)) {                result.add(each);            }        }        return result;    }
复制代码

(2)4.1.1 的官方文档虽然说Hint可以跳过解析和改写,但在我们上面解析引擎的源码解析中,我们并没有看到有对 Hint 策略的额外跳过。事实上,即使使用了 Hint 分片 SQL 也同样需要解析重写,也同样受 Sharding-JDBC 的语法限制,这在官方的issue中也曾经被提及。

▲图片来源:ShardingSphere 官方文档


  • 主从路由

主从路由的核心逻辑就是通过 MasterSlaveDataSourceRouter 的 route 方法进行判定 SQL 走主库还是从库。主从情况下,配置的数据源实际是一组主从,而不是单个的实例,所以需要通过 masterSlaveRule 获取到具体的主库或者从库名字。


  • 主从路由 decorate

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate        public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {        //为空证明没有经过分片路由        if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {            //根据SQL判断选择走主库还是从库            String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());            RouteResult routeResult = new RouteResult();           //根据具体的主库/从库名创建路由单元            routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));            return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);        }        Collection<RouteUnit> toBeRemoved = new LinkedList<>();        Collection<RouteUnit> toBeAdded = new LinkedList<>();        //不为空证明已经被分片路由处理了        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {            if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {                //先标记移除 因为这里是一组主从的名字而不是实际的库                toBeRemoved.add(each);                //根据SQL判断选择走主库还是从库                String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());                //根据具体的主库/从库名创建路由单元                toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));            }        }        routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);        routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);        return routeContext;    }
复制代码

MasterSlaveDataSourceRouter 中 isMasterRoute 方法会判断 SQL 是否需要走主库,当出现以下情况时走主库:

  • select 语句包含锁,如 for update 语句

  • 不是 select 语句

  • MasterVisitedManager.isMasterVisited()设置为 true

  • HintManager.isMasterRouteOnly()设置为 true


不走主库则通过负载算法选择从库,Sharding-JDBC 提供了轮询和随机两种算法。

  • MasterSlaveDataSourceRouter

public final class MasterSlaveDataSourceRouter {         private final MasterSlaveRule masterSlaveRule;         /**     * Route.     *     * @param sqlStatement SQL statement     * @return data source name     */    public String route(final SQLStatement sqlStatement) {        if (isMasterRoute(sqlStatement)) {            MasterVisitedManager.setMasterVisited();            return masterSlaveRule.getMasterDataSourceName();        }        return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));    }         private boolean isMasterRoute(final SQLStatement sqlStatement) {        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();    }         private boolean containsLockSegment(final SQLStatement sqlStatement) {        return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();    }}
复制代码

是否走主库的信息存在 MasterVisitedManager 中,MasterVisitedManager 是通过 ThreadLocal 实现的,但这种实现会有一个问题,当我们使用事务先查询再更新/插入时,第一条查询 SQL 并不会走主库,而是走从库,如果业务需要事务的第一条查询也走主库,事务查询前需要手动调用一次 MasterVisitedManager.setMasterVisited()。

  • MasterVisitedManager

public final class MasterVisitedManager {         private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false);         /**     * Judge master data source visited in current thread.     *     * @return master data source visited or not in current thread     */    public static boolean isMasterVisited() {        return MASTER_VISITED.get();    }         /**     * Set master data source visited in current thread.     */    public static void setMasterVisited() {        MASTER_VISITED.set(true);    }         /**     * Clear master data source visited.     */    public static void clear() {        MASTER_VISITED.remove();    }}
复制代码

3.3.2 引擎总结

路由引擎的作用是将 SQL 根据参数通过实现的策略算法计算出实际该在哪些库的哪些表执行,也就是路由结果。路由引擎有两种实现,分别是分片路由和主从路由,两者都提供了标准化的策略接口来让业务实现自己的路由策略,分片路由需要注意自身 SQL 场景和策略算法相匹配,主从路由中同一线程且同一数据库连接内,有写入操作后,之后的读操作会从主库读取,写入操作前的读操作不会走主库。

3.4 改写引擎

3.4.1 引擎解析

经过解析路由后虽然确定了执行的实际库表,但 SQL 中表名依旧是逻辑表,不能执行,改写引擎可以将逻辑表替换为物理表。同时,路由至多库表的 SQL 也需要拆分为多条 SQL 执行。


改写的入口仍旧在 BasePrepareEngine 中,创建重写上下文 createSQLRewriteContext,再根据上下文进行改写 rewrite,最终返回执行单元 ExecutionUnit。

  • 改写逻辑入口

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite    private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {        //注册重写装饰器        registerRewriteDecorator();        //创建 SQLRewriteContext        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);        //重写        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);    }
复制代码

执行单元包含了数据源名称,改写后的 SQL,以及对应的参数,SQL 一样的两个 SQLUnit 会被视为相等。

  • ExecutionUnit

@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根据sql判断是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit {     private String sql;     private final List<Object> parameters; }
复制代码

createSQLRewriteContext 完成了两件事,一个是对 SQL 参数进行了重写,一个是生成了 SQLToken。

  • createSQLRewriteContext

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext    public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);        //sql参数重写        decorate(decorators, result, routeContext);        //生成SQLToken        result.generateSQLTokens();        return result;    } org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {                //参数重写                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());            }        }        //sqlTokenGenerators        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());    } org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens    public void generateSQLTokens() {        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));    }
复制代码


ParameterRewriter 中与分片相关的实现有两种。

//*详细的例子可以参考官方文档中分页修正和补列部分


SQLToken 记录了 SQL 中每个 token(解析引擎中提过的不可再分的原子符号)的起始位置,从而方便改写引擎知道哪些位置需要改写。

  • SQLToken

@RequiredArgsConstructor@Getterpublic abstract class SQLToken implements Comparable<SQLToken> {         private final int startIndex;         @Override    public final int compareTo(final SQLToken sqlToken) {        return startIndex - sqlToken.getStartIndex();    }}
复制代码

创建完 SQLRewriteContext 后就对整条 SQL 进行重写和组装参数,可以看出每个 RouteUnit 都会重写 SQL 并获取自己对应的参数。

  • SQLRouteRewriteEngine.rewrite

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite    public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {        Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);        for (RouteUnit each : routeResult.getRouteUnits()) {            //重写SQL+组装参数            result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));        }        return result;    }
复制代码

toSQL 核心就是根据 SQLToken 将 SQL 拆分改写再拼装,比如 select * from t_order where created_by = '123' 就会被拆分为 select * from | t_order | where created_by = '123'三部分进行改写拼装。

  • toSQL

org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL    public final String toSQL() {        if (context.getSqlTokens().isEmpty()) {            return context.getSql();        }        Collections.sort(context.getSqlTokens());        StringBuilder result = new StringBuilder();        //截取第一个SQLToken之前的内容  select * from        result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));        for (SQLToken each : context.getSqlTokens()) {            //重写拼接每个SQLToken对应的内容  t_order ->t_order_0            result.append(getSQLTokenText(each));            //拼接SQLToken中间不变的内容 where created_by = '123'            result.append(getConjunctionText(each));        }        return result.toString();    }
复制代码

ParameterBuilder 有 StandardParameterBuilder 和 GroupedParameterBuilder 两个实现。

  • StandardParameterBuilder:适用于非 insert 语句,getParameters 无需分组处理直接返回即可

  • GroupedParameterBuilder:适用于 insert 语句,需要根据路由情况对参数进行分组。

原因和样例可以参考官方文档批量拆分部分

  • getParameters

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters    private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) {        if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) {            //非插入语句直接返回            return parameterBuilder.getParameters();        }        List<Object> result = new LinkedList<>();        int count = 0;        for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) {            if (isInSameDataNode(each, routeUnit)) {                //插入语句参数分组构造                result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count));            }            count++;        }        return result;    }
复制代码

3.4.2 引擎总结

改写引擎的作用是将逻辑 SQL 转换为实际可执行的 SQL,这其中既有逻辑表名的替换,也有多路由的 SQL 拆分,还有为了后续归并操作而进行的分页、分组、排序等改写,select 语句不会对参数进行重组,而 insert 语句为了避免插入多余数据,会通过路由单元对参数进行重组。

3.5 执行引擎

3.5.1 引擎解析

改写完成后的 SQL 就可以执行了,执行引擎需要平衡好资源和效率,如果为每条真实 SQL 都创建一个数据库连接显然会造成资源的滥用,但如果单线程串行也必然会影响执行效率。


执行引擎会先将执行单元中需要执行的 SQLUnit 根据数据源分组,同一个数据源下的 SQLUnit 会放入一个 list,然后会根据 maxConnectionsSizePerQuery 对同一个数据源的 SQLUnit 继续分组,创建连接并绑定 SQLUnit 。

  • 执行组创建

org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups    private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(            final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {        //根据数据源将SQLUnit分组 key=dataSourceName        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);        Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();        //创建sql执行组        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));        }        return result;    } org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups    private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,                                                                       final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {        List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();        //每个连接需要执行的最大sql数量        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);        //分组,每组对应一条数据库连接        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);        //选择连接模式 连接限制/内存限制        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;        //创建连接        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());        int count = 0;        for (List<SQLUnit> each : sqlUnitPartitions) {            //绑定连接和SQLUnit 创建StatementExecuteUnit            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));        }        return result;    }
复制代码

SQLUnit 分组和连接模式选择没有任何关系,连接模式的选择只取决于 maxConnectionsSizePerQuery 和 SQLUnit 数量的大小关系,maxConnectionsSizePerQuery 代表了一个数据源一次查询允许的最大连接数。


  • 当 maxConnectionsSizePerQuery<sqlunit 数量时,意味着无法做到每个 sqlunit 独享一个连接,需要直接查询出结果集至内存中;< li="">

  • 当 maxConnectionsSizePerQuery>=SQLUnit 数量时,意味着可以支持每个 SQLUnit 独享一个连接,可以通过 ResultSet 游标下移的方式查询结果集。


不过 maxConnectionsSizePerQuery 默认值为 1,所以当一条 SQL 需要路由至多张表时(即有多个 SQLUnit)会采用连接限制,当路由至单表时是内存限制模式。

为了避免产生数据库连接死锁问题,在内存限制模式时,Sharding-JDBC 通过锁住数据源对象一次性创建出本条 SQL 需要的所有数据库连接。连接限制模式下,各连接一次性查出各自的结果,不会出现多连接相互等待的情况,因此不会发生死锁,而内存限制模式通过游标读取结果集,需要多条连接去查询不同的表做合并,如果不一次性拿到所有需要的连接,则可能存在连接相互等待的情况造成死锁。可以参照官方文档中执行引擎相关例子

  • 不同连接模式创建连接

private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {    if (1 == connectionSize) {        Connection connection = createConnection(dataSourceName, dataSource);        replayMethodsInvocation(connection);        return Collections.singletonList(connection);    }    if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {        return createConnections(dataSourceName, dataSource, connectionSize);    }    //内存限制模式加锁 一次性获取所有的连接    synchronized (dataSource) {        return createConnections(dataSourceName, dataSource, connectionSize);    }}
复制代码

此外,结果集的内存合并和流式合并只在调用 JDBC 的 executeQuery 的情况下生效,如果使用 execute 方式进行查询,都是统一使用流式方式的查询。

  • 查询结果归并对比

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101   org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {        PreparedStatement preparedStatement = (PreparedStatement) statement;        ResultSet resultSet = preparedStatement.executeQuery();        getResultSets().add(resultSet);        //executeQuery 中根据连接模式选择流式/内存        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);    } //execute 单独调用getResultSet中只会使用流式合并org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158  org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults     private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {        List<QueryResult> result = new ArrayList<>(resultSets.size());        for (ResultSet each : resultSets) {            if (null != each) {                result.add(new StreamQueryResult(each));            }        }        return result;    }
复制代码

多条连接的执行方式分为串行和并行,在本地事务和 XA 事务中是串行的方式,其余情况是并行,具体的执行逻辑这里就不再展开了。

  • isHoldTransaction

public boolean isHoldTransaction() {        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());    }
复制代码

3.5.2 引擎总结

执行引擎通过 maxConnectionsSizePerQuery 和同数据源的 SQLUnit 的数量大小确定连接模式,maxConnectionsSizePerQuery=SQLUnit 数量使用内存限制模式,当使用内存限制模式时会通过对数据源对象加锁来保证一次性获取本条 SQL 需要的连接而避免死锁。在使用 executeQuery 查询时,处理结果集时会根据连接模式选择流式或者内存合并,但使用 execute 方法查询,处理结果集只会使用流式合并。

3.6 归并引擎

3.6.1 引擎解析

查询出的结果集需要经过归并引擎归并后才是最终的结果,归并的核心入口在 MergeEntry 的 process 方法中,优先处理分片场景的合并,再进行脱敏,只有读写分离的情况下则直接返回 TransparentMergedResult,TransparentMergedResult 实际上没做合并的额外处理,其内部实现都是完全调用 queryResult 的实现。

  • 归并逻辑入口

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190 org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61    org.apache.shardingsphere.underlying.merge.MergeEntry#process    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {        //分片合并        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);        //脱敏处理        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);        //只有读写分离的情况下,orElseGet会不存在,TransparentMergedResult        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));    }
复制代码
  • TransparentMergedResult

@RequiredArgsConstructorpublic final class TransparentMergedResult implements MergedResult {         private final QueryResult queryResult;         @Override    public boolean next() throws SQLException {        return queryResult.next();    }         @Override    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {        return queryResult.getValue(columnIndex, type);    }         @Override    public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {        return queryResult.getCalendarValue(columnIndex, type, calendar);    }         @Override    public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {        return queryResult.getInputStream(columnIndex, type);    }         @Override    public boolean wasNull() throws SQLException {        return queryResult.wasNull();    }}
复制代码

我们只看分片相关的操作,ResultMergerEngine 只有一个实现类 ShardingResultMergerEngine,所以只有存在分片情况的时候,上文的第一个 merge 才会有结果。根据 SQL 类型的不同选择 ResultMerger 实现,查询类的合并是最常用也是最复杂的合并。

  • MergeEntry.merge

org.apache.shardingsphere.underlying.merge.MergeEntry#merge    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {            if (entry.getValue() instanceof ResultMergerEngine) {                //选择不同类型的 resultMerger                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);                //归并                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));            }        }        return Optional.empty();    } org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {        if (sqlStatementContext instanceof SelectStatementContext) {            return new ShardingDQLResultMerger(databaseType);        }        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {            return new ShardingDALResultMerger(shardingRule);        }        return new TransparentResultMerger();    }
复制代码

ShardingDQLResultMerger 的 merge 方法就是根据 SQL 解析结果中包含的 token 选择合适的归并方式(分组聚合、排序、遍历),归并后的 mergedResult 统一经过 decorate 方法进行判断是否需要分页归并,整体处理流程图可以概括如下。

  • 归并方式选择

org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {        if (1 == queryResults.size()) {            return new IteratorStreamMergedResult(queryResults);        }        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;        selectStatementContext.setIndexes(columnLabelIndexMap);        //分组聚合,排序,遍历        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        //分页归并        return decorate(queryResults, selectStatementContext, mergedResult);    } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {        if (isNeedProcessGroupBy(selectStatementContext)) {            //分组聚合归并            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        }        if (isNeedProcessDistinctRow(selectStatementContext)) {            setGroupByForDistinctRow(selectStatementContext);            //分组聚合归并            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        }        if (isNeedProcessOrderBy(selectStatementContext)) {            //排序归并            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);        }        //遍历归并        return new IteratorStreamMergedResult(queryResults);    } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate    private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {        PaginationContext paginationContext = selectStatementContext.getPaginationContext();        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {            return mergedResult;        }        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();        //根据数据库类型分页归并        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {            return new LimitDecoratorMergedResult(mergedResult, paginationContext);        }        if ("Oracle".equals(trunkDatabaseName)) {            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);        }        if ("SQLServer".equals(trunkDatabaseName)) {            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);        }        return mergedResult;    }
复制代码

每种归并方式的作用在官方文档有比较详细的案例,这里就不再重复介绍了。

3.6.2 引擎总结

归并引擎是 Sharding-JDBC 执行 SQL 的最后一步,其作用是将多个数节点的结果集组合为一个正确的结果集返回,查询类的归并有分组归并、聚合归并、排序归并、遍历归并、分页归并五种,这五种归并方式并不是互斥的,而是相互组合的。

四、定制开发

在使用 Sharding-JDBC 过程中,我们发现了一些问题可以改进,比如存量系统数据量到达一定规模而需要分库分表引入 Sharding-JDBC 时,就会存在两大问题


一个是存量数据的迁移,这个问题我们可以通过分片算法兼容,前文已经提过分片键的值是不允许更改的,而且 SQL 如果不包含分片键,如果这个分片键对应的值是递增的(如 id,时间等),我们可以设置一个阈值,在分片算法的 doSharding 中判断分片值与阈值的大小决定将数据路由至旧表或新表,避免数据迁移的麻烦。如果是根据用户 id 取模分表,而新增的数据无法只通过用户 id 判断,这时可以考虑采用复合分片算法,将用户 id 与订单 id 或者时间等递增的字段同时设置为分片键,根据订单 id 或时间判断是否是新数据,再根据用户 id 取模得到路由结果即可。


另一个是 Sharding-JDBC 语法限制会使得存量 SQL 面对巨大的改造压力,而实际上业务更关心的是需要分片的表,非分片的表不应该发生改动和影响。实际上,非分片表理论上无需通过解析、路由、重写、合并,为此我们在源码层面对这段逻辑进行了优化,支持跳过部分解析,完全跳过分片路由、重写和合并,尽可能减少 Sharding-JDBC 对非分片表的语法限制,来减少业务系统的改造压力与风险。

4.1 跳过 Sharding 语法限制

Sharding-JDBC 执行解析路由重写的逻辑都是在 BasePrepareEngine 中,最终构造 ExecutionContext 交由执行引擎执行,ExecutionContext 中包含 sqlStatementContext 和 executionUnits,非分片表不涉及路由改写,所以其 ExecutionUnit 我们非常容易手动构造,而查看 SQLStatementContext 的使用情况,我们发现 SQLStatementContext 只会影响结果集的合并而不会影响实际的执行,而不分片表也无需进行结果集的合并,整体实现思路如图。

  • ExecutionContext 相关对象

public class ExecutionContext {     private final SQLStatementContext sqlStatementContext;     private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();} public final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} public final class SQLUnit {     private String sql;     private final List<Object> parameters; }
复制代码

(1)校验 SQL 中是否包含分片表:我们是通过正则将 SQL 中的各个单词分隔成 Set,然后再遍历 BaseRule 判断是否存在分片表。大家可能会奇怪明明解析引擎可以帮我们解析出 SQL 中的表名,为什么还要自己来解析。因为我们测试的过程中发现,存量业务上的 SQL 很多在解析阶段就会报错,只能提前判断,当然这种判断方式并不严谨,比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表 t_order 时就会存在误判,但这种场景在我们的业务中没有,所以暂时并没有处理。由于这个信息需要在多个对象方法中使用,为了避免修改大量的对象变量和方法入参,而又能方便的透传这个信息,判断的结果我们选择放在 ThreadLocal 里。

  • RuleContextManager

public final class RuleContextManager {     private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new);     /**     * 是否跳过sharding     */    private boolean skipSharding;     /**     * 是否路由至主库     */    private boolean masterRoute;     public static boolean isSkipSharding() {        return SKIP_CONTEXT_HOLDER.get().skipSharding;    }     public static void setSkipSharding(boolean skipSharding) {        SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;    }     public static boolean isMasterRoute() {         return SKIP_CONTEXT_HOLDER.get().masterRoute;    }     public static void setMasterRoute(boolean masterRoute) {        SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;    }     public static void clear(){        SKIP_CONTEXT_HOLDER.remove();    } }
复制代码
  • 判断 SQL 是否包含分片表

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext// 判断是否可以跳过sharding,构造RuleContextManager的值private void buildSkipContext(final String sql){    Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));        if (CollectionUtils.isNotEmpty(rules)) {            for (BaseRule baseRule : rules) {                //定制方法,ShardingRule实现,判断sqlTokenSet是否包含逻辑表即可                if(baseRule.hasContainShardingTable(sqlTokenSet)){                    RuleContextManager.setSkipSharding(false);                    break;                }else {                    RuleContextManager.setSkipSharding(true);                }            }        }} org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTablepublic Boolean hasContainShardingTable(Set<String> sqlTokenSet) {      //logicTableNameList通过遍历TableRule可以得到       for (String logicTable : logicTableNameList) {            if (sqlTokenSet.contains(logicTable)) {                return true;            }        }        return false;    }
复制代码

(2)跳过解析路由:通过 RuleContextManager 中的 skipSharding 判断是否需要跳过 Sharding 解析路由,但为了兼容读写分离的场景,我们还需要知道这条 SQL 应该走主库还是从库,走主库的场景在后面强制路由主库部分有说明,SQL 走主库实际上只有两种情况,一种是非 SELECT 语句,另一种就是 SELECT 语句带锁,如 SELECT...FOR UPDATE,因此整体实现的步骤如下:

  • 如果标记了跳过 Sharding 且不为 select 语句,直接返回 SkipShardingStatement,单独构造一个 SkipShardingStatement 的目的是为了能利用解析引擎中的缓存,缓存中不能放入 null 值。

  • 如果是 select 语句需要继续解析,判断是否有锁后直接返回,避免后续解析造成语法不兼容,这里也曾尝试用反射获取 lockClause 来判断是否包含锁,但最终没有成功。

  • ShardingRouteDecorator 根据 RuleContextManager.isSkipSharding 判断是否跳过路由。

  • 跳过解析路由

public class SkipShardingStatement implements SQLStatement{    @Override    public int getParameterCount() {        return 0;    }} org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0    private SQLStatement parse0(final String sql, final boolean useCache) {        if (useCache) {            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);            if (cachedSQLStatement.isPresent()) {                return cachedSQLStatement.get();            }        }        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();        /**         * 跳过sharding 需要判断是否需要路由至主库 如果不是select语句直接跳过         * 是select语句则需要通过继续解析判断是否有锁         */        SQLStatement result ;        if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){            RuleContextManager.setMasterRoute(true);            result = new SkipShardingStatement();        }else {            result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);        }        if (useCache) {            cache.put(sql, result);        }        return result;    } org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause    public ASTNode visitSelectClause(final SelectClauseContext ctx) {        SelectStatement result = new SelectStatement();        // 跳过sharding 只需要判断是否有锁来决定是否路由至主库即可        if(RuleContextManager.isSkipSharding()){            if (null != ctx.lockClause()) {                result.setLock((LockSegment) visit(ctx.lockClause()));                RuleContextManager.setMasterRoute(true);            }            return result;        }        //...后续解析    } org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);        //如果需要跳过sharding 不进行后续的解析直接返回        if (RuleContextManager.isSkipSharding()) {            return new RouteContext(sqlStatement, parameters, new RouteResult());        }        //...解析    } org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        // 跳过sharding路由        if(RuleContextManager.isSkipSharding()){            return routeContext;        }        //...路由
复制代码

(3)手动构造 ExecutionUnit:ExecutionUnit 中我们需要确定的内容就是 datasourceName,这里我们认为跳过 Sharding 的 SQL 最终执行的库一定只有一个。如果只是跳过 Sharding 的情况,直接从元数据中获取数据源名称即可,如果存在读写分离的情况,主从路由的结果也一定是唯一的。创建完 ExecutionUnit 直接放入 ExecutionContext 返回即可,从而跳过后续的改写逻辑。

  • 手动构造 ExecutionUnit

public ExecutionContext prepare(final String sql, final List<Object> parameters) {    List<Object> clonedParameters = cloneParameters(parameters);    // 判断是否可以跳过sharding,构造RuleContextManager的值    buildSkipContext(sql);      RouteContext routeContext = executeRoute(sql, clonedParameters);    ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());    // 跳过sharding的sql最后的路由结果一定只有一个库    if(RuleContextManager.isSkipSharding()){        log.debug("可以跳过sharding的场景 {}", sql);        if(!Objects.isNull(routeContext.getRouteResult())){            Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();            int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();            /*             * 1. 没有读写分离的情况下  跳过sharding路由会导致routeUnitsSize为0 此时需要判断数据源数量是否为1             * 2. 读写分离情况下 只会路由至具体的主库或从库 routeUnitsSize数量应该为1             */            if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){                throw new ShardingSphereException("可以跳过sharding,但是路由结果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());            }            Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();            // 手动创建执行单元            String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();            ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));            result.getExecutionUnits().add(executionUnit);            //标记该结果需要跳过            result.setSkipShardingScenarioFlag(true);        }    }else {        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));    }    if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {        SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());    }    return result;}
复制代码

(4)跳过合并:跳过查询结果的合并和影响行数计算的合并,注意 ShardingPreparedStatement 和 ShardingStatement 都需要跳过

  • 跳过合并

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery    public ResultSet executeQuery() throws SQLException {        ResultSet result;        try {            clearPrevious();            prepare();            initPreparedStatementExecutor();            List<QueryResult> queryResults = preparedStatementExecutor.executeQuery();            List<ResultSet> resultSets = preparedStatementExecutor.getResultSets();        // 定制开发,不分片跳过合并            if(executionContext.isSkipShardingScenarioFlag()){                return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;            }            MergedResult mergedResult = mergeQuery(queryResults);            result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);        } finally {            clearBatch();        }        currentResultSet = result;        return result;    }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet    public ResultSet getResultSet() throws SQLException {        if (null != currentResultSet) {            return currentResultSet;        }        List<ResultSet> resultSets = getResultSets();        // 定制开发,不分片跳过合并        if(executionContext.isSkipShardingScenarioFlag()){            return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;        }         if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);        }        return currentResultSet;    }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate    public boolean isAccumulate() {        //定制开发,不分片跳过计算        if(executionContext.isSkipShardingScenarioFlag()){            return false;        }        return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());    }
复制代码

(5)清空 RuleContextManager:查看一下 Sharding-JDBC 其他 ThreadLocal 的清空位置,对应的清空 RuleContextManager 就好。

  • 清空 ThreadLocal

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#closepublic final void close() throws SQLException {        closed = true;        MasterVisitedManager.clear();        TransactionTypeHolder.clear();        RuleContextManager.clear();        int connectionSize = cachedConnections.size();        try {            forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());        } finally {            cachedConnections.clear();            rootInvokeHook.finish(connectionSize);        }    }
复制代码

举个例子,比如 Sharding-JDBC 本身是不支持 INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ?    这种语法的,会报空指针异常。

经过我们上述改造验证后,非分片表是可以跳过语法限制执行如下的 SQL 的。

通过该功能的实现,业务可以更关注与分片表的 SQL 改造,而无需担心引入 Sharding-JDBC 造成所有 SQL 的验证改造,大幅减少改造成本和风险。

4.2 强制路由主库

Sharding-JDBC 可以通过配置主从库数据源方便的实现读写分离的功能,但使用读写分离就必须面对主从延迟和从库失联的痛点,针对这一问题,我们实现了强制路由主库的动态配置,当主从延迟过大或从库失联时,通过修改配置来实现 SQL 语句强制走主库的不停机路由切换。


后面会说明了配置的动态生效的实现方式,这里只说明强制路由主库的实现,我们直接使用前文的 RuleContextManager 即可,在主从路由引擎里判断下是否开启了强制主库路由。

  • MasterSlaveRouteDecorator.decorate 改造

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {        /**         * 如果配置了强制主库 MasterVisitedManager设置为true         * 后续isMasterRoute中会保证路由至主库         */        if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){            MasterVisitedManager.setMasterVisited();        }        //...路由逻辑        return routeContext;    }
复制代码

为了兼容之前跳过 Sharding 的功能,我们需要同步修改下 isMasterRoute 方法,如果是跳过了 Sharding 路由需要通过 RuleContextManager 来判断是否走主库。

  • isMasterRoute 改造

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute    private boolean isMasterRoute(final SQLStatement sqlStatement) {        if(sqlStatement instanceof SkipShardingStatement){            // 优先以MasterVisitedManager中的值为准            return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();        }        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();    }
复制代码

当然,更理想的状况是通过监控主从同步延迟和数据库拨测,当超过阈值时或从库失联时直接自动修改配置中心的库,实现自动切换主库,减少业务故障时间和运维压力。

4.3 配置动态生效

Sharding-JDBC 中的 ConfigurationPropertyKey 中提供了许多配置属性,而 Sharding-JDBCB 并没有为这些配置提供在线修改的方法,而在实际的应用场景中,像 SQL_SHOW 这样控制 SQL 打印的开关配置,我们更希望能够在线修改配置值来控制 SQL 日志的打印,而不是修改完配置再重启服务。


以 SQL 打印为例,BasePrepareEngine 中存在 ConfigurationProperties 对象,通过调用 getValue 方法来获取 SQL_SHOW 的值。

  • SQL 打印

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare    /**     * Prepare to execute.     *     * @param sql SQL     * @param parameters SQL parameters     * @return execution context     */    public ExecutionContext prepare(final String sql, final List<Object> parameters) {        List<Object> clonedParameters = cloneParameters(parameters);        RouteContext routeContext = executeRoute(sql, clonedParameters);        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));        //sql打印        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());        }        return result;    }
复制代码

ConfigurationProperties 继承了抽象类 TypedProperties,其 getValue 方法就是根据 key 获取对应的配置值,因此我们直接在 TypedProperties 中实现刷新缓存中的配置值的方法。

  • TypedProperties 刷新配置

public abstract class TypedProperties<E extends Enum & TypedPropertyKey> {         private static final String LINE_SEPARATOR = System.getProperty("line.separator");         @Getter    private final Properties props;         private final Map<E, TypedPropertyValue> cache;         public TypedProperties(final Class<E> keyClass, final Properties props) {        this.props = props;        cache = preload(keyClass);    }         private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) {        E[] enumConstants = keyClass.getEnumConstants();        Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1);        Collection<String> errorMessages = new LinkedList<>();        for (E each : enumConstants) {            TypedPropertyValue value = null;            try {                value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());            } catch (final TypedPropertyValueException ex) {                errorMessages.add(ex.getMessage());            }            result.put(each, value);        }        if (!errorMessages.isEmpty()) {            throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));        }        return result;    }         /**     * Get property value.     *     * @param key property key     * @param <T> class type of return value     * @return property value     */    @SuppressWarnings("unchecked")    public <T> T getValue(final E key) {        return (T) cache.get(key).getValue();    }     /**     * vivo定制改造方法 refresh property value.     * @param key property key     * @param value property value     * @return 更新配置是否成功     */    public boolean refreshValue(String key, String value){        //获取配置类支持的配置项        E[] enumConstants = targetKeyClass.getEnumConstants();        for (E each : enumConstants) {            //遍历新的值            if(each.getKey().equals(key)){                try {                    //空白value认为无效,取默认值                    if(!StringUtils.isBlank(value)){                        value = each.getDefaultValue();                    }                    //构造新属性                    TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);                    //替换缓存                    cache.put(each, typedPropertyValue);                    //原始属性也替换下,有可能会通过RuntimeContext直接获取Properties                    props.put(key,value);                    return true;                } catch (final TypedPropertyValueException ex) {                    log.error("refreshValue error. key={} , value={}", key, value, ex);                }            }        }        return false;    }}
复制代码

实现了刷新方法后,我们还需要将该方法一步步暴露至一个外部可以调用的类中,以便在服务监听配置的方法中,能够调用这个刷新方法。ConfigurationProperties 直接在 BasePrepareEngine 的构造函数中传入,我们通过构造函数逐步反推最外层的这一对象调用来源,最终可以定位到在 AbstractDataSourceAdapter 中的 getRuntimeContext()方法中可以获取到这个配置,而这个就是 Sharding-JDBC 实现的 JDBC 中 Datasource 接口的抽象类,我们直接在这个类中调用刚刚实现的 refreshValue 方法,剩下的就是监听配置,通过自己实现的 AbstractDataSourceAdapter 来调用这个方法就好了。


通过这一功能,我们可以方便的控制一些开关属性的在线修改,如 SQL 打印、强制路由主库等,业务无需重启服务即可做到配置的动态生效。

4.4 批量 update 语法支持

业务中存在使用 foreach 标签来批量 update 的语句,这种 SQL 在 Sharding-JDBC 中无法被正确路由,只会路由第一组参数,后面的无法被路由改写,原因是解析引擎无法将语句拆分解析。

  • 批量 update 样例

<update id="batchUpdate">        <foreach collection="orderList" item="item">               update t_order set               status = 1,               updated_by = #{item.updatedBy}               WHERE created_by = #{item.createdBy};        </foreach>    </update>
复制代码



我们通过将批量 update 按照;拆分为多个语句,然后分别路由,最后手动汇总路有结果生成执行单元。


为了能正确重写 SQL,批量 update 拆分后的语句需要完全一样,这样就不能使用动态拼接 set 条件,而是使用 ifnull 语法或者字段值不发生变化时也将原来的值放入 set 中,只不过 set 前后的值保持一致,整体思路与实现如下。


  • prepareBatch 实现


org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) { //SQL去重 List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList()); if (sqlList.size() > 1) { throw new ShardingSphereException("不支持多条SQL,请检查SQL," + sqlList.toString()); } //以第一条SQL为标准 String sql = sqlList.get(0); //所有的执行单元 Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>(); //初始化最后的执行结果 ExecutionContext executionContextResult = null; //根据所有参数数量和SQL语句数量 计算每组参数的数量 int eachSqlParameterCount = allParameters.size() / splitSqlList.size(); //平均分配每条SQL的参数 List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount); for (List<Object> eachSqlParameterList : eachSqlParameterListList) { //每条SQL参数不同 需要根据参数路由不同的结果 实际的SqlStatementContext 是一致的 RouteContext routeContext = executeRoute(sql, eachSqlParameterList); //由于SQL一样 实际的SqlStatementContext 是一致的 只需初始化一次 if (executionContextResult == null) { executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext()); } globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext)); } //排序打印日志 executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList())); if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) { SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST)); } return executionContextResult; }
复制代码

这里我们在 ExecutionContext 单独构造了一个了 ExtendMap 来存放 ExecutionUnit,原因是 ExecutionContext 中的 executionUnits 是 HashSet,而判断 ExecutionUnit 中的 SqlUnit 只会根据 SQL 去重,批量 update 的 SQL 是一致的,但 parameters 不同,为了不影响原有的逻辑,单独使用了另外的变量来存放。

  • ExecutionContext 改造

@RequiredArgsConstructor@Getterpublic class ExecutionContext {     private final SQLStatementContext sqlStatementContext;     private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();     /**     * 自定义扩展变量     */    private final Map<ExtendEnum,Object> extendMap = new HashMap<>();     /**     * 定制扩展,是否可以跳过分片逻辑     */    @Setter    private boolean skipShardingScenarioFlag = false;} @RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根据SQL判断是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit {     private String sql;     private final List<Object> parameters; }
复制代码

我们还需要改造下执行方法,在初始化执行器的时候,判断下 ExtendMap 中存在我们自定义的 EXECUTION_UNIT_LIST 是否存在,存在则使用生成 InputGroup,同一个数据源下的 ExecutionUnit 会被放入同一个 InputGroup 中。

  • InputGroup 改造

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init    public void init(final ExecutionContext executionContext) throws SQLException {        setSqlStatementContext(executionContext.getSqlStatementContext());        //兼容批量update 分库分表后同一张表的情况 判断是否存在EXECUTION_UNIT_LIST 存在则使用未去重的List进行后续的操作        if (MapUtils.isNotEmpty(executionContext.getExtendMap())){            Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);            if(CollectionUtils.isNotEmpty(executionUnitCollection)){                getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));            }        }else {            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));        }        cacheStatements();    }
复制代码

改造完成后,批量 update 中的每条 SQL 都可以被正确路由执行。

4.5 ShardingCondition 去重

当 where 语句包括多个 or 条件时,而 or 条件不包含分片键时,会造成 createShardingConditions 方法生成重复的分片条件,导致重复调用 doSharding 方法。


如 SELECT * FROM t_order  WHERE created_by = ? and (   (status = ?) or  (status = ?) or  (status = ?) )这种 SQL,存在三个 or 条件,分片键是 created_by ,实际产生的 shardingCondition 会是三个一样的值,并会调用三次 doSharding 的方法。虽然实际执行还是只有一次(批量 update 那里说明过执行单元会去重),但为了减少方法的重复调用,我们还是对这里做了一次去重。


去重的方法也比较简单粗暴,我们对 ListRouteValue 和 RangeRouteValue 添加了 @EqualsAndHashCode 注解,然后在 WhereClauseShardingConditionEngine 的 createShardingConditions 方法返回最终结果前加一次去重,从而避免生成重复的 shardingCondition 造成 doSharding 方法的重复调用。

  • createShardingConditions 去重

org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions    private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {        Collection<ShardingCondition> result = new LinkedList<>();        for (AndPredicate each : andPredicates) {            Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);            if (routeValueMap.isEmpty()) {                return Collections.emptyList();            }            result.add(createShardingCondition(routeValueMap));        }        //去重        Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));        return distinctResult;    }
复制代码

4.6  全路由校验

分片表的 SQL 中如果没有携带分片键(或者带上了分片键结果没有被正确解析)将会导致全路由,产生性能问题,而这种 SQL 并不会报错,这就导致在实际的业务改造中,开发和测试很难保证百分百改造彻底。为此,我们在源码层面对这种情况做了额外的校验,当产生全路由,也就是 ShardingConditions 为空时,主动抛出异常,从而方便开发和测试能够快速发现全路由 SQL。


实现方式也比较简单,校验下 ShardingConditions 是否为空即可,只不过需要额外兼容下 Hint 策略 ShardingConditions 始终为空的特殊情况。

  • 全路由校验

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decoratepublic RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        //省略...        //获取 ShardingConditions        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);        boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);        //判断是否允许全路由        if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {            //如果不是Hint算法            if(!isHintAlgorithm(sqlStatementContext, shardingRule)){                /** 如果是DML语句  则可能有两种情况 这两种情况是根据getShardingConditions方法的内部逻辑而来的                 *  一种是非插入语句  shardingConditions.getConditions()为空即可                 *  一种是插入语句 插入语句shardingConditions.getConditions()不会为空  但是ShardingCondition的routeValues是空的                 */                if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {                    if(shardingConditions.getConditions().isEmpty()) {                        throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");                    }else {                        if (sqlStatementContext instanceof InsertStatementContext) {                            List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());                            if(CollectionUtils.isEmpty(routeValuesNotEmpty)){                                throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");                            }                        }                    }                }            }        }        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);        //省略...        return new RouteContext(sqlStatementContext, parameters, routeResult);    } private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {        // 场景a 全局默认策略是否使用强制路由策略        if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy                || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){            return true;        }        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {            Optional<TableRule> tableRule = shardingRule.findTableRule(each);            //场景b 指定表是否使用强制路由策略            if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy                    || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {                return true;            }        }        return false;    }
复制代码

当然这块功能也可以在完善些,比如对分片路由结果中的数据源数量进行校验,从而避免跨库操作,我们这边没有实现也就不再赘述了。

4.7 组件封装

业务接入 Sharding-JDBC 的步骤是一样的,都需要通过 Java 创建数据源和配置对象或者使用 SpringBoot 进行配置,存在一定的熟悉成本和重复开发的问题,为此我们也对定制开发版本的 Sharding-JDBC 封装了一个公共组件,从而简化业务配置,减少重复开发,提升业务的开发效率,具体功能可见下。这块没有涉及源码的改造,只是在定制版本上包装的一个公共组件。

  • 提供了默认的数据源与连接池配置

  • 简化分库分表配置,业务配置逻辑表名和后缀,组件拼装行表达式和 actual-data-nodes

  • 封装常用的分片算法(时间、业务字段值等),

  • 统一的配置监听与动态修改(SQL 打印、强制主从切换等)


  • 开源 Sharding-JDBC 配置

//数据源名称spring.shardingsphere.datasource.names=ds0,ds1//ds0配置spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0spring.shardingsphere.datasource.ds0.username=rootspring.shardingsphere.datasource.ds0.password=//ds1配置spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1spring.shardingsphere.datasource.ds1.username=rootspring.shardingsphere.datasource.ds1.password=//分表规则spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression=t_order$->{order_id % 2}spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expression=t_order_item$->{order_id % 2}//默认分库规则spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_idspring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 2}
复制代码
  • 组件简化配置

//数据源名称vivo.it.sharding.datasource.names = ds0,ds1//ds0配置vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds0.username = rootvivo.it.sharding.datasource.ds0.password =//ds1配置vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds1.username = rootvivo.it.sharding.datasource.ds1.password =//分表规则vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]//默认分库规则vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}
复制代码

五、使用建议

结合官方文档和业务实践经验,我们也梳理了部分使用 Sharding-JDBC 的建议供大家参考,实际具体如何优化 SQL 写法(比如子查询、分页、分组排序等)还需要结合业务的实际场景来进行测试和调优。


(1)强制等级

  • 建议①:涉及分片表的 SQL 必须携带分片键

  • 原因:无分片键会导致全路由,存在严重的性能隐患


  • 建议②:禁止一条 SQL 中的分片值路由至不同的库

  • 原因:跨库操作存在严重的性能隐患,事务操作会升级为分布式事务,增加业务复杂度


  • 建议③:禁止对分片键使用运算表达式或函数操作

  • 原因:无法提前计算表达式和函数获取分片值,导致全路由

  • 说明:详见官方文档

  • 建议⑤:包含 CASE WHEN、HAVING、UNION (ALL)语法的分片 SQL,不支持路由至多数据节点

  • 说明:详见官方文档


(2)建议等级

  • ① 建议使用分布式 id 来保证分片表主键的全局唯一性

  • 原因:方便判断数据的唯一性和后续的迁移扩容

  • 说明:详见文章《vivo 自研鲁班分布式 ID 服务实践》


  • ② 建议跨多表的分组 SQL 的分组字段与排序字段保证一致

  • 原因:分组和排序字段不一致只能通过内存合并,大数据量时存在性能隐患

  • 说明:详见官方文档


  • ③ 建议通过全局递增的分布式 id 来优化分页查询

  • 原因:Sharding-JDBC 的分页优化侧重于结果集的流式合并来避免内存爆涨,但深度分页自身的性能问题并不能解决

  • 说明:详见官方文档

六、总结

本文结合个人理解梳理了各个引擎的源码入口和关键逻辑,读者可以结合本文和官方文档更好的定位理解 Sharding-JDBC 的源码实现。定制开发的目的是为了降低业务接入成本,尽可能减少业务存量 SQL 的改造,部分改造思想其实与官方社区也存在差异,比如跳过语法解析,官方社区致力于通过优化解析引擎来适配各种语法,而不是跳过解析阶段,可参考官方issue。源码分析和定制改造只涉及了 Sharding-JDBC 的数据分片和读写分离功能,定制开发的功能也在生产环境经过了考验,如有不足和优化建议,也欢迎大家批评指正。

发布于: 刚刚阅读数: 6
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
Sharding-JDBC源码解析与vivo的定制开发_源码解析_vivo互联网技术_InfoQ写作社区