写点什么

基于 Sharding-JDBC 的订单分库⽅案

  • 2022 年 4 月 25 日
  • 本文字数:9788 字

    阅读完需:约 32 分钟

基于Sharding-JDBC的订单分库⽅案

一、前言

随着业务的发展,数据量迅速增⻓,单实例 Mysql 数据库的磁盘存储容量很快就会达到上限(阿⾥云 RDB 最⼤上限 6T)。因此为了解决⽣产环境的存储瓶颈,需要对数据库进⾏分库设计,加⼤存储空间。


本⽅案基于 Apache 开源项⽬ ShardingSphere 采 Sharding-JDBC 进⾏订单⽔平分库设计(数据分⽚)缓解存储压⼒。



二、方案对比

⽬前主流的分库分表⽅案就是 Sharding-JDBC 和 Mycat,下⾯主要对⽐这两种。


经上述对⽐,可以发现 Sharding-JDBC 是直连数据库、没有中间层的⽅式,性能上更优,同时由于 Sharding-JDBC 集成在应⽤代码内,并不会增加额外的运维成本,使开发者可以专注于⾃⾝代码逻辑。另外,Sharding-JDBC ⽬前仅⽀持 Java,考虑到⽬前团队技术栈主要以 Java 为主,所以采⽤ Sharding-JDBC 的分库⽅案成为了⼀个不错的选择。


下图反映了 Sharding-JDBC 在应⽤中所处的位置:



三、原理

Sharding-JDBC 提供了精确分⽚扩展接⼝ org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm. PreciseShardingAlgorithm,只需要实现其 doSharding ⽅法,基于分⽚键做⼀定的算法转换,返回指定的数据源,便可以达到分库的效果。 


⽬前我们使⽤的是⼀致性 hash 算法,⽤于定位分⽚键所在数据源,同时引⼊虚拟节点,可以避免数据倾斜问题(即有的数据源数据多,有的数据源数据少,导致数据源设备压⼒不均衡)算法原理如图:


然后基于该分库的分⽚键 &分⽚算法,我们还需在 application.yml 中增加 sharding 数据源配置;同时需要增加分表相关的配置信息。

最后项⽬启动时,sharding 会⾃动加载这些配置初始化数据源。在请求时从请求参数中提取到分⽚键,应⽤分⽚算法,找到⽬标数据源,操作该数据源。


四、实战

在原有订单数据源 datasource_name 的基础上,新加 3 台订单数据库实例 datasource_name_1&datasource_name_2&datasource_name_3,扩充后共 4 台订单数据库实例,磁盘存储最⼤可以达到原有容量的 4 倍。


4.1 引⼊maven 依赖


<dependency>    <groupId>org.apache.shardingsphere</groupId>    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>    <version>4.1.1.001</version></dependency>
复制代码


这⾥的 4.1.1.001 版本是在官⽹ sharding 4.1.1 版本上⾃⾏扩充了⼀部分内容:本⾝根据业务分⽚键对业务数据进⾏分⽚后,同⼀类型的数据只会在⼀个数据源中,sharding 只能操作这⼀个数据源的数据,但对于⼀部分分⽚键落⼊新数据源后,意味着数据源发⽣了变动,⽽在旧库数据未迁⼊到新库时,会存在⼀定的兼容时期,此时,改造源码实现了如下双操作(除插⼊外):

a、双查询:查询新旧数据源的数据,然后业务项⽬⾥使⽤切⾯进⾏查询汇总去重,根据 id+updateDatetime 进⾏去重,保留最新数据。

b、双更新:更新数据时,会同时更新新旧数据源中满⾜条件的数据。

c、双删除:删除数据时,会同时删除新旧数据源中满⾜条件的数据。双删的⽬的是,为了避免只 删除了⼀个库的数据,⽽由于双查导致另⼀个库的数据被查询出来误⽤。

d、单插⼊:插⼊数据时,只会插⼊新分⽚算法所落⼊的数据源中。这样做的好处是,在不影响业 务的情况下,也减少了⼀部分数据迁移的⼯作量,即分库上线后新数据直接插⼊新库,不⽤做任何处理。


改造源码如下:

package org.apache.shardingsphere.core.rule;
/** * Databases and tables sharding rule. */@Getterpublic class ShardingRule implements BaseRule {
// 定义:旧的分库分片策略 private final ShardingStrategy oldDatabaseShardingStrategy; public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) { Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null."); Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty."); this.ruleConfiguration = shardingRuleConfig; shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames); tableRules = createTableRules(shardingRuleConfig); broadcastTables = shardingRuleConfig.getBroadcastTables(); bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups()); // 加载旧的分库分片策略 ShardingStrategyConfiguration oldDatabaseShardingStrategyConfig = shardingRuleConfig.getOldDatabaseShardingStrategyConfig(); if (oldDatabaseShardingStrategyConfig != null) { oldDatabaseShardingStrategy = createDefaultShardingStrategy(oldDatabaseShardingStrategyConfig); } else { oldDatabaseShardingStrategy = null; } defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig()); defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig()); defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig()); masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs()); encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig()); }}
复制代码


package org.apache.shardingsphere.sharding.route.engine;
/** * Sharding route decorator. */public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> { @SuppressWarnings("unchecked") @Override public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) { SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext(); // SQL参数集合 List<Object> parameters = routeContext.getParameters(); ShardingStatementValidatorFactory.newInstance( sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters)); // 获取分片条件 ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule); boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule); if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) { checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions); mergeShardingConditions(shardingConditions); } ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties); RouteResult routeResult = null; // ShardingStandardRoutingEngine特殊处理 if(shardingRouteEngine instanceof ShardingStandardRoutingEngine) { SQLStatement sqlStatement = sqlStatementContext.getSqlStatement(); // 查询/更新/删除操作,开启双操作模式 boolean isCompatibleMode = (sqlStatement instanceof SelectStatement) || (sqlStatement instanceof UpdateStatement) || (sqlStatement instanceof DeleteStatement); routeResult = shardingRouteEngine.route(shardingRule, isCompatibleMode); } else { routeResult = shardingRouteEngine.route(shardingRule); } if (needMergeShardingValues) { Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery."); } return new RouteContext(sqlStatementContext, parameters, routeResult); }}
复制代码


package org.apache.shardingsphere.sharding.route.engine.type.standard;
/** * Sharding standard routing engine. */public final class ShardingStandardRoutingEngine extends BaseShardingRoutingEngine {
/** * 寻找sharding路由结点信息 * @param shardingRule * @param tableRule * @param databaseShardingValues * @param tableShardingValues * @param isCompatibleMode 兼容模式(即同时操作新旧分片算法定位到的数据库,查询/更新/删除自动开启兼容模式) * @return */ private Collection<DataNode> route0(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues, final boolean isCompatibleMode) { Collection<DataNode> result = new LinkedList<>(); // 双查 & 双更 & 双删核心逻辑 // 1. 加载旧数据节点路由信息 ShardingStrategy oldDatabaseShardingStrategy = shardingRule.getOldDatabaseShardingStrategy(); Collection<String> oldDataSources = null; if (oldDatabaseShardingStrategy != null && isCompatibleMode) { oldDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, true); for (String each : oldDataSources) { result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, true)); } } // 2. 加载新数据节点路由信息 Collection<String> routedDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, false); for (String each : routedDataSources) { // 新数据节点未被加载时,加载新数据节点 if(oldDataSources == null || !oldDataSources.contains(each)) { result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, false)); } } return result; } private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final boolean isOld) { if (databaseShardingValues.isEmpty()) { return tableRule.getActualDatasourceNames(); } // 根据isOld参数,区分加载新/旧分片策略 Collection<String> result = new LinkedHashSet<>(); if (isOld) { result.addAll(shardingRule.getOldDatabaseShardingStrategy().doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)); } else { result.addAll(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)); } Preconditions.checkState(!result.isEmpty(), "no database route info"); Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames()); return result; }
}
复制代码


4.2 数据分片

基于业务进⾏分库的分⽚键选择,最好能均匀的将数据拆分到不同数据库实例。

本⽅案假设选择⽤⼾ID 作为分库的分⽚键,然后⾃定义新的分库分⽚算法,算法命名为 DatasourceShardingAlgorithmV2,算法内容如下:

import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;import java.nio.ByteBuffer;import java.nio.ByteOrder;import java.util.*;
/** * 新的分库数据分片算法 * * @author jingmin.yang * @date 2022/4/18 19:23 */public class DatasourceShardingAlgorithmV2 implements PreciseShardingAlgorithm<String> { // 排序存储结构:SortedMap<虚拟节点,物理节点> private static final SortedMap<Integer, String> virtualToRealMap = new TreeMap<Integer, String>();
/** * 基于所有数据源初始化订单分库虚拟节点 * @return */ static { List<String> datasources = new ArrayList<String>(); datasources.add("datasource-name"); datasources.add("datasource-name-1"); datasources.add("datasource-name-2"); datasources.add("datasource-name-3"); // 初始化分库虚拟节点数据 init(virtualToRealMap, datasources); }
@Override public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> preciseShardingValue) { // shardingKey即为分片键值 String shardingKey = preciseShardingValue.getValue(); return getShardingDatasource(virtualToRealMap, shardingKey); }
/** * 获取分库物理节点 * @param virtualToRealMap 分库虚拟节点 * @param shardingKey 分库的分片键标识 * @author: jingmin.yang * @create: 2021/9/10 10:41 * @return */ static String getShardingDatasource(SortedMap<Integer, String> virtualToRealMap, String shardingKey) { int hashValue = hash(shardingKey); // 顺时针找最近的虚拟节点 SortedMap<Integer, String> subVirtualToRealMap = virtualToRealMap.tailMap(hashValue); if (subVirtualToRealMap.isEmpty()) { return virtualToRealMap.get(virtualToRealMap.firstKey()); } return subVirtualToRealMap.get(subVirtualToRealMap.firstKey()); }
/** * 初始化hash环 * * @param: virtualToRealMap * @param: realNodes * @author: jingmin.yang * @create: 2021/9/14 16:29 */ static void init(SortedMap<Integer, String> virtualToRealMap, List<String> realNodes) { for(String node: realNodes) { virtualToRealMap.put(hash(node), node);
// 虚拟出指定数量的虚拟节点 int count = 0; int i = 0; int virtualNum = realNodes.size(); while (count < virtualNum) { i++; String virtualNode = node + "#" + i; // 计算hash值 int hashValue = hash(virtualNode); if (!virtualToRealMap.containsKey(hashValue)) { // 虚拟节点放到环上去 数据结构:红黑树 virtualToRealMap.put(hashValue, node); count++; } } } }
/** * * MurmurHash是一种非加密型哈希函数,适用于一般的哈希检索操作,与其它流行的哈希函数相比,对于规律 * 性较强的key,MurmurHash的随机分布(离散性)特征表现更好,而且具有低碰撞率。Redis对节点进行 * shard时采用的是这种算法。 * * @param: key 分片键 * @author: jingmin.yang * @create: 2021/9/14 19:17 */ static int hash(String key) { ByteBuffer buf = ByteBuffer.wrap(key.getBytes()); int seed = 305441741; ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); int m = 1540483477; byte r = 24;
int h; int k; for (h = seed ^ buf.remaining(); buf.remaining() >= 4; h ^= k) { k = buf.getInt(); k *= m; k ^= k >>> r; k *= m; h *= m; }
if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); finish.put(buf).rewind(); h ^= finish.getInt(); h *= m; }
h ^= h >>> 13; h *= m; h ^= h >>> 15; buf.order(byteOrder); if (h < 0) { h = Math.abs(h); } return h; }}
复制代码

测试:

public static void main(String[] args) {             System.out.println(DatasourceShardingAlgorithmV2.getShardingDatasource(virtualToRealMap, "U0000000001"));}
复制代码


结果:

datasource-name-1
复制代码

4.3 规则配置

Sharding-JDBC 可以通过 Java, YAML, Spring 命名空间 和 Spring Boot Starter 这 4 种⽅式进⾏配置,开发者可根据场景选择适合的配置⽅式。当前⽅案使⽤ YAML ⽅式进⾏规则配置,application.ym 内容如下:

spring:  shardingsphere:  # sharding数据源配置  datasource:    # 对应下方自定义的数据源名称    names: datasource-name,datasource-name-1,datasource-name-2,datasource-name-3    # 数据源基础配置    datasource-name:     # 数据源驱动类型      type: com.alibaba.druid.pool.DruidDataSource      # 数据库驱动类      driver-class-name: com.mysql.jdbc.Driver      # 数据库连接地址      url: jdbc:mysql://127.0.0.1:3306/datasource_name_0     # 数据库用户名      username: username     # 数据库用户密码      password: password    # 自定义第二个数据源(新数据源1)    datasource-name-1:      type: com.alibaba.druid.pool.DruidDataSource      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://127.0.0.1:3306/datasource_name_1      username: username      password: password    # 自定义第三个数据源(新数据源2)    datasource-name-2:      type: com.alibaba.druid.pool.DruidDataSource      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://127.0.0.1:3306/datasource_name_2      username: username      password: password    # 自定义第四个数据源(新数据源3)    datasource-name-3:      type: com.alibaba.druid.pool.DruidDataSource      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://127.0.0.1:3306/datasource_name_3      username: username      password: password  # 采用sharding-jdbc进行分库分表设计  sharding:    # 默认数据源    default-data-source-name: datasource-name    # oldDatabaseStrategy是自行扩展配置(可选),用于双操作(双查询&双更新&双删除)中获取原有数据源,pom需要对应引入4.1.1.001自行扩展版本    oldDatabaseStrategy:      standard:        # 分库分片键        shardingColumn: sharding_key        # 原有的数据源分片算法,本算法返回的是固定的原有数据源datasource-name        preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithm   # 订单分库新的分片算法,根据sharding_key进行分库   default-database-strategy:      standard:        # 分库分片键        shardingColumn: sharding_key        # 新的分库分片算法        preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithmV2    # 分库分表配置,只有分库&分表的表需要在此配置,不分库不分表不需要配置,数据会存入默认数据源    tables:      # table_0使用分库+分表      table_0:        actual-data-nodes: datasource-name.table_0_$->{0..63},datasource-name-$->{1..3}.table_0_$->{0..63}        logic-table: table_0        # 数据库表策略配置        table-strategy:          standard:            # 表分片键            shardingColumn: sharding_key            # 表分片算法            preciseAlgorithmClassName: com.yang.TableShardingAlgorithm      # table_1使用分库,不使用分表      table_1:        actual-data-nodes: datasource-name.table_1, datasource-name-$->{1..3}.table_1        logic-table: table_1
复制代码

4.4 代码改造准备

由于 sharding 是对分⽚键使⽤分⽚算法,获取到对应的数据源进⾏操作,⽽分⽚键是从请求 sql 中提取⽽来,所以需要在代码层⾯改造⽀持分库,所有 CRUD 操作必须要带上分⽚键参数。


<!-- 这里假设数据库是MySQL,分片键是user_id字段 --><!-- 新增操作 -->INSERT INTO order_info('id', 'user_id', 'update_time') VALUES('O0000000001', 'U0000000001', NOW());<!-- 查询操作 -->SELECT * FROM order_info WHERE user_id='U0000000001';<!-- 更新操作 -->UPDATE order_info SET update_time=NOW() WHERE user_id='U0000000001';<!-- 删除操作 -->DELETE FROM order_info WHERE user_id='U0000000001';
复制代码


五、总结


⾄此,基于 Sharding-JDBC 完整的订单分库改造完成,包含选择分库分⽚键、定义分库数据分⽚算法,添加数据源 &表配置,代码改造等。

本⽅案已有实际应⽤案例,如有需要的⼩伙伴可作为参考,有问题也可直接联系我,⼀起讨论学习。


六、参考资料


https://shardingsphere.apache.org/document/4.1.1/cn/manual/sharding-jdbc/

https://github.com/apache/shardingsphere/tree/4.1.1 http://www.mycat.org.cn/

https://github.com/MyCATApache/Mycat2

https://help.aliyun.com/product/29657.html


关于领创集团(Advance Intelligence Group)

领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。


往期回顾 BREAK AWAY

如何解决海量数据更新场景下的 Mysql 死锁问题

企业级 APIs 安全实践指南 (建议初中级工程师收藏)

Cypress UI 自动化测试框架

serverless让我们的运维更轻松


▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:

1、点赞、关注领创集团,获取最新技术分享和公司动态。

2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问官方网站,了解更多企业动态。


发布于: 刚刚阅读数: 3
用户头像

智慧领创美好生活 2021.08.12 加入

AI技术驱动的科技集团,致力于以技术赋能为核心,通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈,带来个性化、陪伴式的产品服务和优质体验。

评论

发布
暂无评论
基于Sharding-JDBC的订单分库⽅案_领创集团Advance Intelligence Group_InfoQ写作社区