写点什么

TiSpark 2.4.1(Spark 2.4.5) 到 TiSpark 2.5.0(Spark 3.0.X/3.1.X) 迁移实践

  • 2022 年 7 月 11 日
  • 本文字数:2021 字

    阅读完需:约 7 分钟

作者: 数据小黑原文来源:https://tidb.net/blog/b8f902a9


【是否原创】是


【首发渠道】TiDB 社区


【首发渠道链接】其他平台首发请附上对应链接

背景介绍


喜大普奔,TiSpark 2.5.0 发布了,其中最重要的特性是支持了 Spark 3.0.X 和 Spark 3.1.X。因为在 k8s 上跑 TiSpark 始终使用的 Spark 3.0.3 的环境(原因见:TiSpark On Kubernetes 实践 关键解释中的说明), 也因为统一技术栈的需要,想要统一 TiSpark 的运行环境到 3.0.X, 对 TiSpark 2.5.0 进行了初步测试。


本文描述了从 TiSpar k2.4.1(Spark 2.4.5) 到 TiSpark 2.5.0(Spark 3.0.X/3.1.X) 所需做出的修改,运行时状态对比,出现的问题及解决方案。

基础环境版本

  • TiDB 版本

  • TiDB 5.4.0

  • 测试 1:

  • TiSpark 2.4.1+Spark 2.4.5

  • submit 环境 Spark 3.0.3

  • 测试 2:

  • TiSpark 2.5.0+Spark 3.0.3

  • submit 环境 Spark 3.0.3

  • 测试 3:

  • TiSpark 2.5.0+Spark 3.1.2

  • submit 环境 Spark 3.1.2

  • 特别说明

  • 测试数据量为 10 万行。

  • spark 的 executor 运行参数如下:


测试 2 和测试 3 的结果基本一直,出现的问题也类同,不做单独分析。

迁移时需要的修改

连接参数修改

TiSpark 2.4.1 迁移到 TiSpark 2.5.0,并结合 Spark 3.0.33.1.2 使用时,需要在 Spark 的 conf 中增加如下配置:



.set("spark.sql.catalog.tidb_catalog","org.apache.spark.sql.catalyst.catalog.TiCatalog").set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
复制代码


此配置可以在 Spark 代码中 SparkConf() 构建的时候 set,也可以在 Spark submit 或者 Spark 的 spark-defaults.conf 文件中配置。

读取代码的修改

上一小节增加的两行表示在 Spark 中增加一个名字是 tidb_catalog 的 catalog,此处的 catalog 类似 database,对后续代码的影响就是:


use tidb_catalog.sbtest
复制代码


或者


select * from tidb_catalog.sbtest.sbtest_o
复制代码


切换数据库需要按照 database+schema 的方式,读取表时需要按照 database+schema+table 的方式。


此处需要注意,写回 TiDB 的时候,不需要指定 database,完整的代码如下:


//通过 TiSpark 将 DataFrame 批量写入 TiDBMap<String, String> tiOptionMap = new HashMap<String, String>();tiOptionMap.put("tidb.addr", tidb_addr);tiOptionMap.put("tidb.port", "4000");tiOptionMap.put("tidb.user", username);tiOptionMap.put("tidb.password", password);tiOptionMap.put("replace", "true");tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);
String source_db_name = "tidb_catalog.sbtest";String source_table_name = "sbtest_o";String target_db_name = "sbtest2";String target_table_name = "sbtest_t";
spark.sql("use "+source_db_name);String source_sql = "select * from "+source_table_name;spark.sql(source_sql) .write() .format("tidb") .options(tiOptionMap) .option("database", target_db_name) .option("table", target_table_name) .mode(SaveMode.Append) .save();
复制代码

运行情况

测试 1 运行状态

总体运行状态:



shuffle 数据量情况:



核心运行时间在 3.3min:



运行时发现有溢出磁盘的情况:


测试 2/3 运行状态

总体运行情况:



shuffle 数据情况:



核心运行时间在 3.2min:



运行时发现有溢出磁盘的情况:


测试对比

2.5.0 版本运行时简化了一次判空动作,整体运行时间缩短了 0.1min,磁盘溢出比 2.4.1 版本增多,偶尔会出现问题,对出现的问题总结如下。

常见错误与处理

ConcurrentModificationException

com.pingcap.tikv.exception.TiKVException: Execution exception met.......Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException......Caused by: java.util.ConcurrentModificationException......
复制代码


出现上述错误时,需要修改 TiSpark 源码,修改 com.pingcap.tikv.util.ConcreteBackOffer 这个类的 backOffFunctionMap 的定义和初始化代码:


private final ConcurrentHashMap<BackOffFunction.BackOffFuncType,BackOffFunction> backOffFunctionMap;this.backOffFunctionMap = new ConcurrentHashMap<BackOffFunction.BackOffFuncType, BackOffFunction>();
复制代码


重新编译 TiSpark


mvn clean install -Dmaven.test.skip=true
复制代码


重新编译 Spark,重新运行即可解决。

Missing an output location for shuffle 3

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3......
复制代码


出现上述错误是因为 Spark 的运行内存不足,导出 task 失败重试,有很多种调优方式,列举两种:


  1. 对数据进行 repartition 操作,例如:

  2. 有条件的增加 executor 资源,例如:



--conf spark.executor.memory=1G \"
复制代码

迁移总结

整体迁移较容易,需要修改的也不多。2.5.0 减少了一个步骤,对 shuffle 的使用有所增加,容易引起一些 shuffle 的问题,也能通过调整规避。下一步准备阅读源码对 2.5.0 的优化作进一步的理解和解读。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TiSpark 2.4.1(Spark 2.4.5)到TiSpark 2.5.0(Spark 3.0.X/3.1.X)迁移实践_实践案例_TiDB 社区干货传送门_InfoQ写作社区