写点什么

易仓跨境 Saas 全球租户,如何做到数据秒级响应?

  • 2022 年 5 月 12 日
  • 本文字数:2881 字

    阅读完需:约 9 分钟

前 言 

易仓科技成立于 2013 年,致力于构建智能协同的跨境网络,让全球贸易更轻松。经过 9 年的积累沉淀,产业链 SaaS+生态链协同的服务模式,已发展成为跨境电商行业的头部企业。目前租户分布全球,数据来自各大洲,汇集国外各大电商平台各个站点销售数据。面对这种数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?


问 题

①数据分布三大洲,如何集中统一计算?

②Saas 租户数据库级别物理隔离,分系统分库分表,百万张表如何同步?

(1) A 客户,A 系统,两个库总共 1000+张表与 B 客户,A 系统同样的 1000+张表

(2) Saas 租户不同套餐保护不同功能系统,各系统表数量在 100、200 到 500 张不等

(3) A、B 客户在同系统在同一个数据库实例与单客户单数据库实例(rds 实例,polarDB 实例)

(4) 几百个数据库实例分布在不同地域,不同国家

③如何在保证数据准确的基础上达到毫秒级别同步入仓?


技术选型



针对以上问题的特殊性,复杂性,我们在技术选型上做了大量的调研以及深度测试,并与云服务团队深度交流做出了大量优化。


一、对于问题①②,数据分布以及多租户多库多表,如何处理?



1.租户数据库分布在阿里云 rds 以及 polarDB 的几百个实例,分散在国内外各大 Region,dts 同时支持全库全表同步以及可选择库表。单个 dts 同步表数量没有限制,高度匹配易仓复杂的业务场景,在实施过程的功能以及性能优化是问题解决关键。

2.dts 数据同步将国内外数据集中实时同步到统一华南区域的 Kafka,保证数据准确,时效在毫秒级别。


二、数据准确毫秒级别同步入仓,关键点在 Flink 的高并发 pipeline 处理数据,时延毫秒级,且兼具可靠性


3.由 Flink 实时消费 Kafka 数写到实时数仓 Hologres。


4.借助 Maxcompute(下文简称 MC)与实时数仓 Hologres 的互通能力,完成复杂的分析计算。


遇到的坑

1.DTS 规格预估不准导致数据同步不稳定且没有及时的告警机制无法及时处理,无法保证数据准确性。


2.底层数仓建模不清晰,对于物理删除的客户数据数据中心采用逻辑删除进行标记后利用 MC 进行合并计算,以至于跟数据源无法一比一比对。无法判断数据同步的准确性。


3.以上导致了频繁的客户数据重推,数据初始化,以至于 Kafka 数据大量堆积 Flink 消费不及时。


4.以上步骤环环相扣,陷入数据异常同步的死循环,导致数据不可用。


如何解决

1.根据数据库实例实际 RPS 采用 DTS 对应的规格,推动 DTS 完善告警机制。数据同步稳定性实时性都大幅度提高。

(1) 推动 DTS 团队解决百万表级别数量的 Api 接口支持,开发互相支持快速迭代

(2) 定位大数据量表初始化任务频繁重启问题,优化 dts 任务元数据存储逻辑策略



2.数据贴源层跟数据库一比一,重构底层设计方案,每个数据原表在原建表基础上通过 sharing_seq、db_seq、company_code 加上原表的主键组成唯一索引构成数据中心的目的表。


-- 租户源表CREATE TABLE `erp_test` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `d_key` varchar(50) NOT NULL DEFAULT '' COMMENT '字典key',  `d_val` varchar(100) NOT NULL DEFAULT '' COMMENT '字典val',  `d_desc` varchar(100) NOT NULL DEFAULT '' COMMENT '字典描述',  `ec_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATECURRENT_TIMESTAMP COMMENT '更新时间',  `ec_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  PRIMARY KEY (`id`),  KEY `idxx_ec_update_time` (`ec_update_time`),  KEY `idxx_ec_create_time` (`ec_create_time`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT="erp_test表';
-- 数据中心目的表CREATE TABLE `erp_test` ( `sharing_seq` text, `db_seq` text, `company_code` text, `id` bigint NOT NULL, `d_desc` text, `d_key` text, `d_val` text, `ec_create_time` timestamptz, `ec_update_time` timestamptz,PRIMARY KEY (`sharing_seq`, `db_seq`, `company_code`, `id`));
复制代码

3.Flink 核心代码逻辑优化,达到数据一比一还原数据库大量的新增、更新以及物理删除操作。数据可对比,有迹可循。通过大量数据对比,数据准确可靠并且实时性达到秒级

class DtsToDataworksHoloStream(fromSource: TSource[JSONObject], toSink: TSink[JSONObject]) extends TStream[JSONObject, JSONObject] {  override val source: TSource[JSONObject] = fromSource  override val sink: TSink[JSONObject] = toSink
override protected def transform(dataStream: DataStream[JSONObject], others: DataStream[_]*): DataStream[JSONObject] = { val parallelism: Int = AppUtil.env.getParallelism val confStream: DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = others.head.asInstanceOf[DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]]] //获取各租户广播配置信息 val confStreamDescriptor = new MapStateDescriptor[String, DbInfo2DataworksHoloBean]("DtsConfig", classOf[String], classOf[DbInfo2DataworksHoloBean]) val confBroadcast: BroadcastStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = confStream.broadcast(confStreamDescriptor) //Dts推送kafka的canalJson与租户广播配置connect val result: DataStream[JSONObject] = dataStream.connect(confBroadcast).process(new Dts2DataworksHoloTransformFunction) .keyBy(_.getString("table").hashCode % parallelism).map(s => s) result }}
//还原数据库增删改def upsertOrDelTable(tableName: String, data: Array[(String, Any)], opType: String): Unit = { val put = new Put(holoclient.getTableSchema(tableName)) if (opType == "DELETE") { put.getRecord.setType(SqlCommandType.DELETE) } for (kv <- data) { put.setObject(kv._1.toLowerCase, kv._2) } holoclient.put(put) }
复制代码

亿万数据量级的业务订单表,实现了从数据源端到数据中心的业务延迟控制在毫秒级别,基于此数据中心秒级别的业务响应更是不在话下。


后 记

DTS 团队针对易仓复杂的单实例多租户场景优化:


1.单实例多租户数 10 万张表初始化,DTS 提供的 API 接口完全无法满足需求!

问题排查:表数量太大,接口参数体超过 nginx 网关限制,tomcat 限制。限制调整后仍然无法满足。


最后解决:开发新 Api 接口,经过与 dts 开发反复联调,最终采用 oss 文件中转方式绕过网关,实现 10 万级别表的初始化推送。


2.DTS 实例单次初始化表数据了限制以及 DTS 元数据策略优化

问题排查:单次初始化表达到一定限制,dts 对应的 reader 模块内存溢出。任务无限重启,延迟逐步增大。


最后解决:dts 开发日夜坚守,出现问题快速响应,临时调大 reader 模块内存并排查根本原因,最后专门针对 SaaS 这种多表的场景在元数据存储策略方面进行了大量优化并彻底解决问题。


注:

①Flink:一个批处理和流处理结合的统一计算框架

②Hologres:一站式实时数仓引擎(Real-Time Data Warehouse)

③Maxcompute:大数据计算服务,快速、完全托管的 PB 级数据仓库解决方案

用户头像

还未添加个人签名 2020.10.15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
易仓跨境Saas全球租户,如何做到数据秒级响应?_数据库_阿里云大数据AI技术_InfoQ写作社区