TiSpark 如何扩展 Spark
作者: Billmay 原文来源:https://tidb.net/blog/a6ebee89
Spark API
Spark 主要提供以下四种拓展方式
Customized function or RDD
当使用 RDD 编程时,我们完全可以基于 Customized function or RDD 实现外部数据源的读写,完成数据源的拓展以及各种操作。但当 Spark SQL 出现,这样的方式就失效了。
Example
Create customized function for existing RDD
Create customized RDD
DataSource API
Define the way read and write from other datasource.
DataSource API 是 Spark 中非常活跃的一个 API,几乎每个 Spark 版本都会对其进行优化。其主要的版本对照如表
Before Spark 2.3:DataSource API v1
优点
It is simple,适用于大多数场景
缺点
Coupled with other APIs (RDD, DataFrame, SQLContext)
和上层 API 耦合过重,就比如 SQLContext 被废弃,那意味着 DataSource API 也得废弃
Hard to push down
下推通过组合 trait 的方式,那么就会有笛卡尔积的 trait。比如要增加 limit 能力, 就得增加 limitscan, limitprunedscan, limitfilterscan, limitprunedfilterscan 多个 trait
可下推的算子也有限,比如不支持下推 Aggregate
Write API too simple
No streaming support
Spark 2.3 - Spark 3.0: DataSource API v2
优点
Friendly to Java for DS V2 APIs is Java interface
Does not coupled with other APIs
Easier to push down
每种 pushdown 都有自己的 interface
Streaming support
After Spark 3.0 datasource api v2
摘自 Data Source V2 API Improvement design doc
升级版的 v2 优化了以下问题
Scan execution order is not obvious
Splitting and reading data partitions should be independent
Columnar Scan API should not be a mixin trait
Streaming API doesn’t play well with the batch API
Interface name is confusing
Catalyst extension
Spark SQL 最重要的部分就是 catalyst ,它负责 SQL 的解析分析优化等操作,其主要流程如下。
从 Spark 2.2 之后,Spark 支持拓展 catalyst。拓展点如下表
其中 Analyzer 有三个拓展点,分别的用处为
injectResolutionRule:可以在这鉴权,检查元信息
injectPostHocResolutionRule:可以在这检查 insert
injectCheckRule:check 是否还有 unresolved 的
在 Spark 3 之后,又额外提供了一些其他拓展点
e.injectColumnar
e.injectFunction
e.injectQueryStagePrepRule
Catalyst 拓展只能在 catalyst 框架下进行,具体表现为
拓展点的位置被限制
无法修改其他 rule
这其实会有一些问题:比如我们无法修改原有 rule 的行为,原有 rule 会 block 一些我们特殊需求,如特殊的类型转换等。
Catalog plugin
更好的支持多数据源
在 spark 3 之后,Spark 提供了 Catalog plugin,能够
Provide schema
DDL
Multiple catalog
TiSpark 2.5
TiKV 为数据源进行 Read + Write。
TiSpark Read
API: Customized RDD + Catalyst Extension + Catalog plugin
TiSpark Read 使用 Catalyst Extension ,拓展 injectPlannerStrategy
拓展点。在该拓展点中,TiSpark 会截取可下推的 sub-plan 进行下推至 TiKV,并获取数据。无法下推的部分交给 Spark 完成。
TiSpark 2.5 之后,还使用了 Spark 3.0 提供的 catalog plugin ,侵入性更小。在没有 catalog plugin 时,我们需要 hacker catalog 以及更多的执行计划,还需要提供一些额外机制(dbprefix)用于判断属于哪一个 DataSource,非常不方便。
为什么不使用 DataSource API 呢?
DS API 无法精确下推
Spark 3.2 之前不支持下推 Aggration
无法根据数据类型进行是否下推的判断
TiSpark Write
DataSource API V1 + RDD API
TiSpark Write 的实现基于 DataSource API V1 拓展。TiSpark 会从 Dataframe 出发,利用 RDD API 进行数据的各种处理,最后使用 TiKV-java-client 提供的 2PC 接口保证整体事务的原子性
TiSpark master
TiSpark 2.5 主要基于 DS V1 实现,其问题有
Write 拓展的粒度不够细
No streaming support
Can’t use v2 API in user view (如 writeto 无法使用)
Can’t apply new features of spark(很多新特性都是基于 DSV2,如 delete)
No Catalog support,比如需要通过 option 传入 db 与 table 信息
No Catalyst optimize,Write 逻辑节点在 catalyst 中不会有任何优化 / 检查
因此 TiSpark 在 master 分支进行了 DS V1 -> DS V2 的升级,主要做的事情如下
Support catalog plugin
Spark 3 之后,提供了 catalog plugin,使得 mutile catalog 成为可能。
使用 catalog plugin 可以带来以下优点
Less hacker code
原来我们需要自定义混合的 catalog ( tidb schema + hive schema),逻辑混杂在一起。
Need not dbprefix
为了防止同名库表,原来我们需要 dbprefix 区分不同的 datasoue
Make read simple
由于原来我们需要在 catalyst 额外进行一些拓展,使用混合 catalog 去判断库表的存在性。难以维护开发
Read on DSV2
在 V1 API 中,Read 需要拓展 catalyst,将逻辑节点替换为 TiDBRelation,然后由 TiDBRelation 提供 schema(而不是 catalog)
在 V2 AP2 中,schema 由 TiDBTable 提供。更重要的是无需拓展 catalyst 并去替换为 TiDBRelation,Spark 会使用 catalog.loadTable 帮我们加载 TiDBTable。
因此,Read 可以进行如下优化
不必在 Analyzer 中改写相关逻辑节点了为 TiDBRelation 了。
不必在 Planner 下推时使用 TiDBRelation 的 schema 了,直接从 Spark 原生逻辑节点 DataSourceV2ScanRelation2(TiDBTable) 中获取即可
Write on DSV2
对开发者来说,Spark 提供了 v1,v2 API 用于拓展。
对于用户来说,Spark 也向用户暴露了两类 API。如 df.write
是早期的 api,df.writeto.append
为新版 api。
Spark 会判断开发者是拓展了 v1 还是 v2
df.writeto.append:开发者只能使用 v2 API 实现。
df.write:需要向前兼容,因此 v1 与 v2 API 都可以用。Spark 主要通过实现的接口判断开发者使用的 API 是 v1 还是 v2。不满足任一以下情况的都会被判断为 v1 API
Source extends SupportsCatalogOption/TableProvider
Table extends supportsWrite
Table has batchwrite capabilities
Problems
原来 write 由 DSV1 实现,在转向 V2 的过程中我们发现了一些问题
无法处理整体数据
V2 write framework can’t process global Data
和 Catalyst 的优化有冲突,如
Data convert:boolean -> long (unsafe)
Autorandom:mismatch
Temporary solution
由于 DSV2 存在一些问题,我们的处理方式是:先进行整体 v2 的切换,在 v2 框架中,write 仍先使 v1 API 实现
总结
DSV2 support 带来的好处
Less hacker code, easier to develop
The benefit of DSV2
Streaming support
New user API support(writeto)
Closer to catalyst
The new feature of Spark
Delete
More pushdown support
TiSpark prospect
Read
随着 Spark 对下推的支持,我们可以逐步使用 DataSource API ,而不再是拓展 catalyst 的方式。
Write
能够使用 DSV2 改写 write
Spark SQl
支持更多的 SQL:Insert,delete,update,mergeinto
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/ffdd95eee086a15c470647698】。文章转载请联系作者。
评论