TiSpark 数据写入过程解析(源码解析)
【是否原创】是
【首发渠道】TiDB 社区,转载请注明出处
背景介绍
TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势,和 TiDB 一起为用户一站式解决 HTAP (Hybrid Transactional/Analytical Processing) 的需求。TiSpark 依赖于 TiKV 集群和 Placement Driver (PD),也需要搭建一个 Spark 集群。
其架构图如下:
作者前一段时间做过 TiSpark on k8s 的实践,请参考:https://asktug.com/t/topic/422775/3。
实践过程中,通过 Spark ui 可以观察到,TiSpark 写入过程中,做了很多的工作,如下所示:
结合执行过程,充分理解官方文档后,大约理解这个过程是把写数据,拆分 region 等动作,都用 Spark 进行了处理,然后直接写入 tikv。那么具体都干了哪些事情?顺序是怎样的?下面通过阅读源码来讲解。
阅读版本:TiSpark-2.4.1
源码分析
整体流程介绍
整体流程图如下:
从上述流程图可以看到,整个过程大体分了 5 个步骤:
初始化与检查:初始化批写类,检查来源表与目标表是否符合要求
数据预处理:根据目标表的要求,对来源数据进行检查和补充
预分区(Region)与打散分区(Region):对数据进行预分区与打散分区操作,此处分区是指的 region
两阶段提交:通过两阶段提交过程,提交数据
刷新统计信息:刷新表的统计信息
下面对前四个阶段的内容做详细分析。
初始化与检查
在 Spark 中执行 save 方法后,Spark 会调用 org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand 的 run 的方法,Spark 实例化 com.pingcap.tispark.TiDBDataSource 之后,会执行 TiDBDataSource 的 createRelation 方法, 然后执行整个写入过程。
主要的几个类的分工如下:
com.pingcap.tispark.write.TiBatchWrite:逻辑控制
com.pingcap.tispark.write.TiBatchWriteTable:每个写入过程实例化一个 TiBatchWriteTable,过程中操作的具体实现类
初始化与检查进行了如下工作:
图中,左边的是 TiBatchWrite,右边的是 TiBatchWriteTable。
在这个阶段,主要做了一些类的初始化;一些参数的检查 :是否开启了批写开关 spark.tispark.write.enable,
是否属于下列不支持的情况之一:clustered index,auto random column,partition table,generated column;表的检查:列的数据量是否一致,来源表是否是空表。
这个阶段最后,对目标表进行了加锁处理:lock tables databaseName.tableName write local
数据预处理
数据预处理阶段工作如下:
此阶段主要进行了三个动作:
自增列处理:如果自增列全部为空值,则直接删除,如果自增列有值,则删除原有自增列的值,重新填充
spark row 到 tikv row 的转化:转化 spark 的 row 为 tikv 的 row,转化后的 row 对象组成的 rdd 用于后续数据写入
构建最终写入数据集:根据主键或者唯一索引的状况,构建最终数据集,包含无主键冲突的 insert 数据和有主键冲突的 replace 数据
预分区(Region)与打散分区(Region)
预分区(Region)与打散分区(Region)阶段动作如下:
此阶段的主要动作是调用 com.pingcap.tikv.TiSession,根据当前数据情况,对数据进行 split region 和 scatter region,此阶段执行完之后,数据对应的 region 已经在 tikv 上创建,并在节点直接打散。
两阶段提交
两阶段提交的工作如下:
分了几个过程:
prewritePrimaryKey
prewriteSecondaryKeys
commitPrimaryKey
unlock table
commitSecondaryKeys
整个过程的理解可以参考:https://pingcap.com/zh/blog/percolator-and-txn,作者水平有限,怕写不好引起误解,在此不再赘述。
其他资源
文末附上时序图,因作者水平有限,如果有理解不对的地方,请指正。
评论