一、前言
随着业务的发展,数据量迅速增⻓,单实例 Mysql 数据库的磁盘存储容量很快就会达到上限(阿⾥云 RDB 最⼤上限 6T)。因此为了解决⽣产环境的存储瓶颈,需要对数据库进⾏分库设计,加⼤存储空间。
本⽅案基于 Apache 开源项⽬ ShardingSphere 采 Sharding-JDBC 进⾏订单⽔平分库设计(数据分⽚)缓解存储压⼒。
二、方案对比
⽬前主流的分库分表⽅案就是 Sharding-JDBC 和 Mycat,下⾯主要对⽐这两种。
经上述对⽐,可以发现 Sharding-JDBC 是直连数据库、没有中间层的⽅式,性能上更优,同时由于 Sharding-JDBC 集成在应⽤代码内,并不会增加额外的运维成本,使开发者可以专注于⾃⾝代码逻辑。另外,Sharding-JDBC ⽬前仅⽀持 Java,考虑到⽬前团队技术栈主要以 Java 为主,所以采⽤ Sharding-JDBC 的分库⽅案成为了⼀个不错的选择。
下图反映了 Sharding-JDBC 在应⽤中所处的位置:
三、原理
Sharding-JDBC 提供了精确分⽚扩展接⼝ org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm. PreciseShardingAlgorithm,只需要实现其 doSharding ⽅法,基于分⽚键做⼀定的算法转换,返回指定的数据源,便可以达到分库的效果。
⽬前我们使⽤的是⼀致性 hash 算法,⽤于定位分⽚键所在数据源,同时引⼊虚拟节点,可以避免数据倾斜问题(即有的数据源数据多,有的数据源数据少,导致数据源设备压⼒不均衡)算法原理如图:
然后基于该分库的分⽚键 &分⽚算法,我们还需在 application.yml 中增加 sharding 数据源配置;同时需要增加分表相关的配置信息。
最后项⽬启动时,sharding 会⾃动加载这些配置初始化数据源。在请求时从请求参数中提取到分⽚键,应⽤分⽚算法,找到⽬标数据源,操作该数据源。
四、实战
在原有订单数据源 datasource_name 的基础上,新加 3 台订单数据库实例 datasource_name_1&datasource_name_2&datasource_name_3,扩充后共 4 台订单数据库实例,磁盘存储最⼤可以达到原有容量的 4 倍。
4.1 引⼊maven 依赖
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1.001</version>
</dependency>
复制代码
这⾥的 4.1.1.001 版本是在官⽹ sharding 4.1.1 版本上⾃⾏扩充了⼀部分内容:本⾝根据业务分⽚键对业务数据进⾏分⽚后,同⼀类型的数据只会在⼀个数据源中,sharding 只能操作这⼀个数据源的数据,但对于⼀部分分⽚键落⼊新数据源后,意味着数据源发⽣了变动,⽽在旧库数据未迁⼊到新库时,会存在⼀定的兼容时期,此时,改造源码实现了如下双操作(除插⼊外):
a、双查询:查询新旧数据源的数据,然后业务项⽬⾥使⽤切⾯进⾏查询汇总去重,根据 id+updateDatetime 进⾏去重,保留最新数据。
b、双更新:更新数据时,会同时更新新旧数据源中满⾜条件的数据。
c、双删除:删除数据时,会同时删除新旧数据源中满⾜条件的数据。双删的⽬的是,为了避免只 删除了⼀个库的数据,⽽由于双查导致另⼀个库的数据被查询出来误⽤。
d、单插⼊:插⼊数据时,只会插⼊新分⽚算法所落⼊的数据源中。这样做的好处是,在不影响业 务的情况下,也减少了⼀部分数据迁移的⼯作量,即分库上线后新数据直接插⼊新库,不⽤做任何处理。
改造源码如下:
package org.apache.shardingsphere.core.rule;
/**
* Databases and tables sharding rule.
*/
@Getter
public class ShardingRule implements BaseRule {
// 定义:旧的分库分片策略
private final ShardingStrategy oldDatabaseShardingStrategy;
public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null.");
Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty.");
this.ruleConfiguration = shardingRuleConfig;
shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
tableRules = createTableRules(shardingRuleConfig);
broadcastTables = shardingRuleConfig.getBroadcastTables();
bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups());
// 加载旧的分库分片策略
ShardingStrategyConfiguration oldDatabaseShardingStrategyConfig = shardingRuleConfig.getOldDatabaseShardingStrategyConfig();
if (oldDatabaseShardingStrategyConfig != null) {
oldDatabaseShardingStrategy = createDefaultShardingStrategy(oldDatabaseShardingStrategyConfig);
} else {
oldDatabaseShardingStrategy = null;
}
defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig());
masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs());
encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig());
}
}
复制代码
package org.apache.shardingsphere.sharding.route.engine;
/**
* Sharding route decorator.
*/
public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {
@SuppressWarnings("unchecked")
@Override
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
// SQL参数集合
List<Object> parameters = routeContext.getParameters();
ShardingStatementValidatorFactory.newInstance(
sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
// 获取分片条件
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
mergeShardingConditions(shardingConditions);
}
ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
RouteResult routeResult = null;
// ShardingStandardRoutingEngine特殊处理
if(shardingRouteEngine instanceof ShardingStandardRoutingEngine) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
// 查询/更新/删除操作,开启双操作模式
boolean isCompatibleMode = (sqlStatement instanceof SelectStatement) || (sqlStatement instanceof UpdateStatement) || (sqlStatement instanceof DeleteStatement);
routeResult = shardingRouteEngine.route(shardingRule, isCompatibleMode);
} else {
routeResult = shardingRouteEngine.route(shardingRule);
}
if (needMergeShardingValues) {
Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
}
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
}
复制代码
package org.apache.shardingsphere.sharding.route.engine.type.standard;
/**
* Sharding standard routing engine.
*/
public final class ShardingStandardRoutingEngine extends BaseShardingRoutingEngine {
/**
* 寻找sharding路由结点信息
* @param shardingRule
* @param tableRule
* @param databaseShardingValues
* @param tableShardingValues
* @param isCompatibleMode 兼容模式(即同时操作新旧分片算法定位到的数据库,查询/更新/删除自动开启兼容模式)
* @return
*/
private Collection<DataNode> route0(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues,
final boolean isCompatibleMode) {
Collection<DataNode> result = new LinkedList<>();
// 双查 & 双更 & 双删核心逻辑
// 1. 加载旧数据节点路由信息
ShardingStrategy oldDatabaseShardingStrategy = shardingRule.getOldDatabaseShardingStrategy();
Collection<String> oldDataSources = null;
if (oldDatabaseShardingStrategy != null && isCompatibleMode) {
oldDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, true);
for (String each : oldDataSources) {
result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, true));
}
}
// 2. 加载新数据节点路由信息
Collection<String> routedDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, false);
for (String each : routedDataSources) {
// 新数据节点未被加载时,加载新数据节点
if(oldDataSources == null || !oldDataSources.contains(each)) {
result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, false));
}
}
return result;
}
private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final boolean isOld) {
if (databaseShardingValues.isEmpty()) {
return tableRule.getActualDatasourceNames();
}
// 根据isOld参数,区分加载新/旧分片策略
Collection<String> result = new LinkedHashSet<>();
if (isOld) {
result.addAll(shardingRule.getOldDatabaseShardingStrategy().doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
} else {
result.addAll(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
}
Preconditions.checkState(!result.isEmpty(), "no database route info");
Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result),
"Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
return result;
}
}
复制代码
4.2 数据分片
基于业务进⾏分库的分⽚键选择,最好能均匀的将数据拆分到不同数据库实例。
本⽅案假设选择⽤⼾ID 作为分库的分⽚键,然后⾃定义新的分库分⽚算法,算法命名为 DatasourceShardingAlgorithmV2,算法内容如下:
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;
/**
* 新的分库数据分片算法
*
* @author jingmin.yang
* @date 2022/4/18 19:23
*/
public class DatasourceShardingAlgorithmV2 implements PreciseShardingAlgorithm<String> {
// 排序存储结构:SortedMap<虚拟节点,物理节点>
private static final SortedMap<Integer, String> virtualToRealMap = new TreeMap<Integer, String>();
/**
* 基于所有数据源初始化订单分库虚拟节点
* @return
*/
static {
List<String> datasources = new ArrayList<String>();
datasources.add("datasource-name");
datasources.add("datasource-name-1");
datasources.add("datasource-name-2");
datasources.add("datasource-name-3");
// 初始化分库虚拟节点数据
init(virtualToRealMap, datasources);
}
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> preciseShardingValue) {
// shardingKey即为分片键值
String shardingKey = preciseShardingValue.getValue();
return getShardingDatasource(virtualToRealMap, shardingKey);
}
/**
* 获取分库物理节点
* @param virtualToRealMap 分库虚拟节点
* @param shardingKey 分库的分片键标识
* @author: jingmin.yang
* @create: 2021/9/10 10:41
* @return
*/
static String getShardingDatasource(SortedMap<Integer, String> virtualToRealMap, String shardingKey) {
int hashValue = hash(shardingKey);
// 顺时针找最近的虚拟节点
SortedMap<Integer, String> subVirtualToRealMap = virtualToRealMap.tailMap(hashValue);
if (subVirtualToRealMap.isEmpty()) {
return virtualToRealMap.get(virtualToRealMap.firstKey());
}
return subVirtualToRealMap.get(subVirtualToRealMap.firstKey());
}
/**
* 初始化hash环
*
* @param: virtualToRealMap
* @param: realNodes
* @author: jingmin.yang
* @create: 2021/9/14 16:29
*/
static void init(SortedMap<Integer, String> virtualToRealMap, List<String> realNodes) {
for(String node: realNodes) {
virtualToRealMap.put(hash(node), node);
// 虚拟出指定数量的虚拟节点
int count = 0;
int i = 0;
int virtualNum = realNodes.size();
while (count < virtualNum) {
i++;
String virtualNode = node + "#" + i;
// 计算hash值
int hashValue = hash(virtualNode);
if (!virtualToRealMap.containsKey(hashValue)) {
// 虚拟节点放到环上去 数据结构:红黑树
virtualToRealMap.put(hashValue, node);
count++;
}
}
}
}
/**
*
* MurmurHash是一种非加密型哈希函数,适用于一般的哈希检索操作,与其它流行的哈希函数相比,对于规律
* 性较强的key,MurmurHash的随机分布(离散性)特征表现更好,而且具有低碰撞率。Redis对节点进行
* shard时采用的是这种算法。
*
* @param: key 分片键
* @author: jingmin.yang
* @create: 2021/9/14 19:17
*/
static int hash(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 305441741;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
int m = 1540483477;
byte r = 24;
int h;
int k;
for (h = seed ^ buf.remaining(); buf.remaining() >= 4; h ^= k) {
k = buf.getInt();
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
finish.put(buf).rewind();
h ^= finish.getInt();
h *= m;
}
h ^= h >>> 13;
h *= m;
h ^= h >>> 15;
buf.order(byteOrder);
if (h < 0) {
h = Math.abs(h);
}
return h;
}
}
复制代码
测试:
public static void main(String[] args) {
System.out.println(DatasourceShardingAlgorithmV2.getShardingDatasource(virtualToRealMap, "U0000000001"));
}
复制代码
结果:
4.3 规则配置
Sharding-JDBC 可以通过 Java, YAML, Spring 命名空间 和 Spring Boot Starter 这 4 种⽅式进⾏配置,开发者可根据场景选择适合的配置⽅式。当前⽅案使⽤ YAML ⽅式进⾏规则配置,application.ym 内容如下:
spring:
shardingsphere:
# sharding数据源配置
datasource:
# 对应下方自定义的数据源名称
names: datasource-name,datasource-name-1,datasource-name-2,datasource-name-3
# 数据源基础配置
datasource-name:
# 数据源驱动类型
type: com.alibaba.druid.pool.DruidDataSource
# 数据库驱动类
driver-class-name: com.mysql.jdbc.Driver
# 数据库连接地址
url: jdbc:mysql://127.0.0.1:3306/datasource_name_0
# 数据库用户名
username: username
# 数据库用户密码
password: password
# 自定义第二个数据源(新数据源1)
datasource-name-1:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/datasource_name_1
username: username
password: password
# 自定义第三个数据源(新数据源2)
datasource-name-2:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/datasource_name_2
username: username
password: password
# 自定义第四个数据源(新数据源3)
datasource-name-3:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/datasource_name_3
username: username
password: password
# 采用sharding-jdbc进行分库分表设计
sharding:
# 默认数据源
default-data-source-name: datasource-name
# oldDatabaseStrategy是自行扩展配置(可选),用于双操作(双查询&双更新&双删除)中获取原有数据源,pom需要对应引入4.1.1.001自行扩展版本
oldDatabaseStrategy:
standard:
# 分库分片键
shardingColumn: sharding_key
# 原有的数据源分片算法,本算法返回的是固定的原有数据源datasource-name
preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithm
# 订单分库新的分片算法,根据sharding_key进行分库
default-database-strategy:
standard:
# 分库分片键
shardingColumn: sharding_key
# 新的分库分片算法
preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithmV2
# 分库分表配置,只有分库&分表的表需要在此配置,不分库不分表不需要配置,数据会存入默认数据源
tables:
# table_0使用分库+分表
table_0:
actual-data-nodes: datasource-name.table_0_$->{0..63},datasource-name-$->{1..3}.table_0_$->{0..63}
logic-table: table_0
# 数据库表策略配置
table-strategy:
standard:
# 表分片键
shardingColumn: sharding_key
# 表分片算法
preciseAlgorithmClassName: com.yang.TableShardingAlgorithm
# table_1使用分库,不使用分表
table_1:
actual-data-nodes: datasource-name.table_1, datasource-name-$->{1..3}.table_1
logic-table: table_1
复制代码
4.4 代码改造准备
由于 sharding 是对分⽚键使⽤分⽚算法,获取到对应的数据源进⾏操作,⽽分⽚键是从请求 sql 中提取⽽来,所以需要在代码层⾯改造⽀持分库,所有 CRUD 操作必须要带上分⽚键参数。
<!-- 这里假设数据库是MySQL,分片键是user_id字段 -->
<!-- 新增操作 -->
INSERT INTO order_info('id', 'user_id', 'update_time') VALUES('O0000000001', 'U0000000001', NOW());
<!-- 查询操作 -->
SELECT * FROM order_info WHERE user_id='U0000000001';
<!-- 更新操作 -->
UPDATE order_info SET update_time=NOW() WHERE user_id='U0000000001';
<!-- 删除操作 -->
DELETE FROM order_info WHERE user_id='U0000000001';
复制代码
五、总结
⾄此,基于 Sharding-JDBC 完整的订单分库改造完成,包含选择分库分⽚键、定义分库数据分⽚算法,添加数据源 &表配置,代码改造等。
本⽅案已有实际应⽤案例,如有需要的⼩伙伴可作为参考,有问题也可直接联系我,⼀起讨论学习。
六、参考资料
https://shardingsphere.apache.org/document/4.1.1/cn/manual/sharding-jdbc/
https://github.com/apache/shardingsphere/tree/4.1.1 http://www.mycat.org.cn/
https://github.com/MyCATApache/Mycat2
https://help.aliyun.com/product/29657.html
关于领创集团(Advance Intelligence Group)
领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。
往期回顾 BREAK AWAY
如何解决海量数据更新场景下的 Mysql 死锁问题
企业级 APIs 安全实践指南 (建议初中级工程师收藏)
Cypress UI 自动化测试框架
serverless让我们的运维更轻松
▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:
1、点赞、关注领创集团,获取最新技术分享和公司动态。
2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问官方网站,了解更多企业动态。
评论