作者: 数据小黑原文来源:https://tidb.net/blog/a84d0ac7
背景
TiSpark 3.0.0 于 6 月 15 号发布了,新的版本中提到了很多期望已久的功能,本文对几个新特性做了对比测试,验证新版本的特性是否符合线上要求。本文基础运行环境为 Spark On kubernetes,Spark 镜像打包时,已包含 TiSpark 必要的依赖。
阅读收益
TiSpark 3.0.0 兼容性更改与新特性解析
下面的兼容性更改与新特性摘自官方:
兼容性更改
TiSpark without catalog plugin is no more supported. You must configure catalog configs and use tidb_catalog now
此特性简言之就是在 3.0.0 版本中,不再支持非 catalog plugin 的配置,更改说明如下:在 TiSpark 2.5.0 中,如下配置时是可以正常读取数据的:
数据读取:
注意 use sbtest 中的 sbtest 是 tidb 的数据库。在 TiSpark 3.0.0 中需按照如下配置:
数据读取:
此案例中 use tidb_catalog.sbtest 中 tidb_catalog 是 spark.sql.catalog.tidb_catalog 配置中指定的,如果配置时使用.set(“spark.sql.catalog.tidb_catalog2”, “org.apache.spark.sql.catalyst.catalog.TiCatalog”),那么 use 时,需使用 use tidb_catalog2.sbtest
TiSpark’s jar has a new naming rule like tispark-assembly-{spark_version}_{scala_version}-{$tispark_verison}
此特性把 scala_version 版本号体现在版本命名中,版本包命名由 tispark-assembly-3.0-2.5.1.jar 变化成了 tispark-assembly-3.0_2.12-3.0.0.jar。
新功能
Support DELETE statement
基于兼容性更改中 Datasource API 版本的替换,在 Spark 3.0.0 中可以支持 DELETE 特性,例如可以执行如下语句:spark.sql(“delete from tidb_catalog.db.table where xxx”)。
Support Spark 3.2
支持 Spark 3.2 运行环境。
Support telemetry to collect information
支持遥测信息收集
Support stale read to read historical versions of data
支持过时读取特性( Stale Read ),使用此特性时需要在配置中指定毫秒级时间戳,指定时间戳后,数据读取时,程序按照指定时间戳读取一个 Snapshot,所有的 SELECT 语句都会从这个 Snapshot 中读取数据。如果每个 SQL 都需要读取不同的 Snapshot,需要在每个 SQL 之前配置不同的时间戳 (java 示例):
Support TLS with reload capability
支持 TLS 并具备动态更新证书的能力
特性评测
我们有个 tidb 实验环境,是在 k8s 里面的,这个环境配套了一个 spark on k8s 环境。这次测试是在 spark on k8s+tidb on k8s 中测试的。
非 catalog plugin 的配置运行情况
基于 spark 3.0.3 on k8s 测试非 catalog plugin 的配置运行情况。因为以前的项目都是基于 maven 的,本次已经在 maven 项目下进行测试,首先修改项目依赖:
<properties>
<tispark.spark.version>3.0_2.12</tispark.spark.version>
<tispark.version>3.0.0</tispark.version>
</properties>
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-assembly-${tispark.spark.version}</artifactId>
<version>${tispark.version}</version>
</dependency>
复制代码
本次依赖包的命名有变化,artifactId 由 tispark-assembly 改变成 tispark-assembly-3.0_2.12 也就是说对应每个版本的 spark,artifactId 不同,类似下表所列:
其次隐藏掉关于 catalog 的两行配置,修改后如下所示:
SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
//.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
//.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
复制代码
运行 spark 程序,具体运行配置过程参考:TiSpark On Kubernetes 实践运行日志如下:
22/06/21 01:42:49 ERROR TiExtensions$: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
com.pingcap.tikv.exception.TiInternalException: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
at org.apache.spark.sql.TiExtensions$.validateCatalog(TiExtensions.scala:79)
复制代码
代码中加了个检查,在没有配置 Catalog 的情况下,检查报错,提示 TiSpark must work with TiCatalog。
Spark 3.0.3 + stale read + delete 特性测试
实际运行时,建议最低使用 TiSpark 3.0.1 ,这个版本的已知问题比较少,各个 Spark 版本下也都测试通过。
Spark 3.2.1+TiSpark 3.0.1 stale read + delete 特性测试测试构建代码如下:
String pd_addr = "basic-pd.tidb-cluster:2379";
String tidb_addr = "basic-tidb.tidb-cluster";
SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
SparkSession spark = SparkSession
.builder().appName("RdbToRdbProcess")
.config(conf)
.getOrCreate();
// 通过 TiSpark 将 DataFrame 批量写入 TiDB
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);
spark.sql("use tidb_catalog.sbtest2");
// 获取当前时间戳
long ttl=System.currentTimeMillis();
System.out.println("删除前查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("删除");
spark.sql("delete from sbtest_t_t where id = 100").show();
System.out.println("删除后查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("stale read");
spark.conf().set("spark.tispark.stale_read", ttl);
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("置空时间戳之后查询");
spark.conf().set("spark.tispark.stale_read", "");
spark.sql("select * from sbtest_t_t where id = 100").show();
复制代码
上述查询结果依次:
删除前查询:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
删除后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+
stale read:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
置空时间戳之后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+
复制代码
由以上查询结果可知,数据能够执行删除,删除后正常查询是查询不到的,使用 stale read,利用删除之前的时间戳能够查询到数据,置空时间戳后,恢复普通查询,数据能够查询到。
总结
本次版本中 stale read + delete 让 tispark 具有更灵活的应用场景,经过验证 spark 3.2.1 跟 tidb 6.1.0 on k8s 通讯还有些问题仍需解决,另外一方面,也盼望着能兼容分区表的一些操作能发布出来,比如说导入数据之前能够 truncate 分区一类的操作。
评论