写点什么

TiSpark 3.0.0 新特性实践

  • 2022 年 7 月 11 日
  • 本文字数:3245 字

    阅读完需:约 11 分钟

作者: 数据小黑原文来源:https://tidb.net/blog/a84d0ac7

背景

TiSpark 3.0.0 于 6 月 15 号发布了,新的版本中提到了很多期望已久的功能,本文对几个新特性做了对比测试,验证新版本的特性是否符合线上要求。本文基础运行环境为 Spark On kubernetes,Spark 镜像打包时,已包含 TiSpark 必要的依赖。

阅读收益

TiSpark 3.0.0 兼容性更改与新特性解析

下面的兼容性更改与新特性摘自官方:

兼容性更改

  • TiSpark without catalog plugin is no more supported. You must configure catalog configs and use tidb_catalog now

  • 此特性简言之就是在 3.0.0 版本中,不再支持非 catalog plugin 的配置,更改说明如下:在 TiSpark 2.5.0 中,如下配置时是可以正常读取数据的:

  • 数据读取:

  • 注意 use sbtest 中的 sbtest 是 tidb 的数据库。在 TiSpark 3.0.0 中需按照如下配置:

  • 数据读取:

  • 此案例中 use tidb_catalog.sbtest 中 tidb_catalog 是 spark.sql.catalog.tidb_catalog 配置中指定的,如果配置时使用.set(“spark.sql.catalog.tidb_catalog2”, “org.apache.spark.sql.catalyst.catalog.TiCatalog”),那么 use 时,需使用 use tidb_catalog2.sbtest

  • TiSpark’s jar has a new naming rule like tispark-assembly-{spark_version}_{scala_version}-{$tispark_verison}

  • 此特性把 scala_version 版本号体现在版本命名中,版本包命名由 tispark-assembly-3.0-2.5.1.jar 变化成了 tispark-assembly-3.0_2.12-3.0.0.jar。

新功能

  • Support DELETE statement

  • 基于兼容性更改中 Datasource API 版本的替换,在 Spark 3.0.0 中可以支持 DELETE 特性,例如可以执行如下语句:spark.sql(“delete from tidb_catalog.db.table where xxx”)。

  • Support Spark 3.2

  • 支持 Spark 3.2 运行环境。

  • Support telemetry to collect information

  • 支持遥测信息收集

  • Support stale read to read historical versions of data

  • 支持过时读取特性( Stale Read ),使用此特性时需要在配置中指定毫秒级时间戳,指定时间戳后,数据读取时,程序按照指定时间戳读取一个 Snapshot,所有的 SELECT 语句都会从这个 Snapshot 中读取数据。如果每个 SQL 都需要读取不同的 Snapshot,需要在每个 SQL 之前配置不同的时间戳 (java 示例):

  • Support TLS with reload capability

  • 支持 TLS 并具备动态更新证书的能力

特性评测

我们有个 tidb 实验环境,是在 k8s 里面的,这个环境配套了一个 spark on k8s 环境。这次测试是在 spark on k8s+tidb on k8s 中测试的。

非 catalog plugin 的配置运行情况

基于 spark 3.0.3 on k8s 测试非 catalog plugin 的配置运行情况。因为以前的项目都是基于 maven 的,本次已经在 maven 项目下进行测试,首先修改项目依赖:


<properties>    <tispark.spark.version>3.0_2.12</tispark.spark.version>    <tispark.version>3.0.0</tispark.version></properties><dependency>    <groupId>com.pingcap.tispark</groupId>    <artifactId>tispark-assembly-${tispark.spark.version}</artifactId>    <version>${tispark.version}</version></dependency>
复制代码


本次依赖包的命名有变化,artifactId 由 tispark-assembly 改变成 tispark-assembly-3.0_2.12 也就是说对应每个版本的 spark,artifactId 不同,类似下表所列:



其次隐藏掉关于 catalog 的两行配置,修改后如下所示:


SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")        //.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")        //.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)        .set("spark.tispark.pd.addresses", pd_addr);
复制代码


运行 spark 程序,具体运行配置过程参考:TiSpark On Kubernetes 实践运行日志如下:


22/06/21 01:42:49 ERROR TiExtensions$: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.com.pingcap.tikv.exception.TiInternalException: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.  at org.apache.spark.sql.TiExtensions$.validateCatalog(TiExtensions.scala:79)
复制代码


代码中加了个检查,在没有配置 Catalog 的情况下,检查报错,提示 TiSpark must work with TiCatalog。

Spark 3.0.3 + stale read + delete 特性测试

实际运行时,建议最低使用 TiSpark 3.0.1 ,这个版本的已知问题比较少,各个 Spark 版本下也都测试通过。


Spark 3.2.1+TiSpark 3.0.1 stale read + delete 特性测试测试构建代码如下:


String pd_addr = "basic-pd.tidb-cluster:2379";String tidb_addr = "basic-tidb.tidb-cluster";
SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog") .set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr) .set("spark.tispark.pd.addresses", pd_addr);SparkSession spark = SparkSession .builder().appName("RdbToRdbProcess") .config(conf) .getOrCreate();// 通过 TiSpark 将 DataFrame 批量写入 TiDBMap<String, String> tiOptionMap = new HashMap<String, String>();tiOptionMap.put("tidb.addr", tidb_addr);tiOptionMap.put("tidb.port", "4000");tiOptionMap.put("tidb.user", username);tiOptionMap.put("tidb.password", password);tiOptionMap.put("replace", "true");tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);
spark.sql("use tidb_catalog.sbtest2");// 获取当前时间戳long ttl=System.currentTimeMillis();
System.out.println("删除前查询");spark.sql("select * from sbtest_t_t where id = 100").show();System.out.println("删除");spark.sql("delete from sbtest_t_t where id = 100").show();System.out.println("删除后查询");spark.sql("select * from sbtest_t_t where id = 100").show();System.out.println("stale read");spark.conf().set("spark.tispark.stale_read", ttl);spark.sql("select * from sbtest_t_t where id = 100").show();System.out.println("置空时间戳之后查询");spark.conf().set("spark.tispark.stale_read", "");spark.sql("select * from sbtest_t_t where id = 100").show();
复制代码


上述查询结果依次:


删除前查询:+---+------+--------------------+--------------------+| id|     k|                   c|                 pad|+---+------+--------------------+--------------------+|100|503013|72324218654-54342...|17648767791-53546...|+---+------+--------------------+--------------------+删除后查询:+---+---+---+---+| id|  k|  c|pad|+---+---+---+---++---+---+---+---+stale read:+---+------+--------------------+--------------------+| id|     k|                   c|                 pad|+---+------+--------------------+--------------------+|100|503013|72324218654-54342...|17648767791-53546...|+---+------+--------------------+--------------------+置空时间戳之后查询:+---+---+---+---+| id|  k|  c|pad|+---+---+---+---++---+---+---+---+

复制代码


由以上查询结果可知,数据能够执行删除,删除后正常查询是查询不到的,使用 stale read,利用删除之前的时间戳能够查询到数据,置空时间戳后,恢复普通查询,数据能够查询到。

总结

本次版本中 stale read + delete 让 tispark 具有更灵活的应用场景,经过验证 spark 3.2.1 跟 tidb 6.1.0 on k8s 通讯还有些问题仍需解决,另外一方面,也盼望着能兼容分区表的一些操作能发布出来,比如说导入数据之前能够 truncate 分区一类的操作。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TiSpark 3.0.0 新特性实践_实践案例_TiDB 社区干货传送门_InfoQ写作社区