写点什么

TiSpark 如何扩展 Spark

  • 2022 年 8 月 12 日
    北京
  • 本文字数:2824 字

    阅读完需:约 9 分钟

作者: Billmay 原文来源:https://tidb.net/blog/a6ebee89

Spark API

Spark 主要提供以下四种拓展方式


Customized function or RDD

当使用 RDD 编程时,我们完全可以基于 Customized function or RDD 实现外部数据源的读写,完成数据源的拓展以及各种操作。但当 Spark SQL 出现,这样的方式就失效了。


Example


  • Create customized function for existing RDD

  • Create customized RDD

DataSource API

Define the way read and write from other datasource.


DataSource API 是 Spark 中非常活跃的一个 API,几乎每个 Spark 版本都会对其进行优化。其主要的版本对照如表


Before Spark 2.3:DataSource API v1

优点


  • It is simple,适用于大多数场景


缺点


  • Coupled with other APIs (RDD, DataFrame, SQLContext)

  • 和上层 API 耦合过重,就比如 SQLContext 被废弃,那意味着 DataSource API 也得废弃

  • Hard to push down

  • 下推通过组合 trait 的方式,那么就会有笛卡尔积的 trait。比如要增加 limit 能力, 就得增加 limitscan, limitprunedscan, limitfilterscan, limitprunedfilterscan 多个 trait

  • 可下推的算子也有限,比如不支持下推 Aggregate

  • Write API too simple

  • No streaming support

Spark 2.3 - Spark 3.0: DataSource API v2

优点


  • Friendly to Java for DS V2 APIs is Java interface

  • Does not coupled with other APIs

  • Easier to push down

  • 每种 pushdown 都有自己的 interface

  • Streaming support

After Spark 3.0 datasource api v2

摘自 Data Source V2 API Improvement design doc


升级版的 v2 优化了以下问题


  • Scan execution order is not obvious

  • Splitting and reading data partitions should be independent

  • Columnar Scan API should not be a mixin trait

  • Streaming API doesn’t play well with the batch API

  • Interface name is confusing


Catalyst extension

Spark SQL 最重要的部分就是 catalyst ,它负责 SQL 的解析分析优化等操作,其主要流程如下。



从 Spark 2.2 之后,Spark 支持拓展 catalyst。拓展点如下表



其中 Analyzer 有三个拓展点,分别的用处为


  • injectResolutionRule:可以在这鉴权,检查元信息

  • injectPostHocResolutionRule:可以在这检查 insert

  • injectCheckRule:check 是否还有 unresolved 的


在 Spark 3 之后,又额外提供了一些其他拓展点


  • e.injectColumnar

  • e.injectFunction

  • e.injectQueryStagePrepRule


Catalyst 拓展只能在 catalyst 框架下进行,具体表现为


  • 拓展点的位置被限制

  • 无法修改其他 rule


这其实会有一些问题:比如我们无法修改原有 rule 的行为,原有 rule 会 block 一些我们特殊需求,如特殊的类型转换等。

Catalog plugin

更好的支持多数据源


在 spark 3 之后,Spark 提供了 Catalog plugin,能够


  • Provide schema

  • DDL

  • Multiple catalog

TiSpark 2.5

TiKV 为数据源进行 Read + Write。

TiSpark Read

API: Customized RDD + Catalyst Extension + Catalog plugin


TiSpark Read 使用 Catalyst Extension ,拓展 injectPlannerStrategy拓展点。在该拓展点中,TiSpark 会截取可下推的 sub-plan 进行下推至 TiKV,并获取数据。无法下推的部分交给 Spark 完成。


TiSpark 2.5 之后,还使用了 Spark 3.0 提供的 catalog plugin ,侵入性更小。在没有 catalog plugin 时,我们需要 hacker catalog 以及更多的执行计划,还需要提供一些额外机制(dbprefix)用于判断属于哪一个 DataSource,非常不方便。


为什么不使用 DataSource API 呢?


  • DS API 无法精确下推

  • Spark 3.2 之前不支持下推 Aggration

  • 无法根据数据类型进行是否下推的判断

TiSpark Write

DataSource API V1 + RDD API


TiSpark Write 的实现基于 DataSource API V1 拓展。TiSpark 会从 Dataframe 出发,利用 RDD API 进行数据的各种处理,最后使用 TiKV-java-client 提供的 2PC 接口保证整体事务的原子性

TiSpark master

TiSpark 2.5 主要基于 DS V1 实现,其问题有


  • Write 拓展的粒度不够细

  • No streaming support

  • Can’t use v2 API in user view (如 writeto 无法使用)

  • Can’t apply new features of spark(很多新特性都是基于 DSV2,如 delete)

  • No Catalog support,比如需要通过 option 传入 db 与 table 信息

  • No Catalyst optimize,Write 逻辑节点在 catalyst 中不会有任何优化 / 检查


因此 TiSpark 在 master 分支进行了 DS V1 -> DS V2 的升级,主要做的事情如下

Support catalog plugin

Spark 3 之后,提供了 catalog plugin,使得 mutile catalog 成为可能。


使用 catalog plugin 可以带来以下优点


  1. Less hacker code


原来我们需要自定义混合的 catalog ( tidb schema + hive schema),逻辑混杂在一起。


  1. Need not dbprefix


为了防止同名库表,原来我们需要 dbprefix 区分不同的 datasoue


  1. Make read simple


由于原来我们需要在 catalyst 额外进行一些拓展,使用混合 catalog 去判断库表的存在性。难以维护开发

Read on DSV2

在 V1 API 中,Read 需要拓展 catalyst,将逻辑节点替换为 TiDBRelation,然后由 TiDBRelation 提供 schema(而不是 catalog)


在 V2 AP2 中,schema 由 TiDBTable 提供。更重要的是无需拓展 catalyst 并去替换为 TiDBRelation,Spark 会使用 catalog.loadTable 帮我们加载 TiDBTable。


因此,Read 可以进行如下优化


  • 不必在 Analyzer 中改写相关逻辑节点了为 TiDBRelation 了。

  • 不必在 Planner 下推时使用 TiDBRelation 的 schema 了,直接从 Spark 原生逻辑节点 DataSourceV2ScanRelation2(TiDBTable) 中获取即可

Write on DSV2

对开发者来说,Spark 提供了 v1,v2 API 用于拓展。


对于用户来说,Spark 也向用户暴露了两类 API。如 df.write 是早期的 api,df.writeto.append 为新版 api。


Spark 会判断开发者是拓展了 v1 还是 v2


  • df.writeto.append:开发者只能使用 v2 API 实现。

  • df.write:需要向前兼容,因此 v1 与 v2 API 都可以用。Spark 主要通过实现的接口判断开发者使用的 API 是 v1 还是 v2。不满足任一以下情况的都会被判断为 v1 API

  • Source extends SupportsCatalogOption/TableProvider

  • Table extends supportsWrite

  • Table has batchwrite capabilities

Problems

原来 write 由 DSV1 实现,在转向 V2 的过程中我们发现了一些问题


  1. 无法处理整体数据


  • V2 write framework can’t process global Data


  1. 和 Catalyst 的优化有冲突,如


  • Data convert:boolean -> long (unsafe)

  • Autorandom:mismatch

Temporary solution

由于 DSV2 存在一些问题,我们的处理方式是:先进行整体 v2 的切换,在 v2 框架中,write 仍先使 v1 API 实现

总结

DSV2 support 带来的好处


  • Less hacker code, easier to develop

  • The benefit of DSV2

  • Streaming support

  • New user API support(writeto)

  • Closer to catalyst

  • The new feature of Spark

  • Delete

  • More pushdown support

TiSpark prospect

  • Read

  • 随着 Spark 对下推的支持,我们可以逐步使用 DataSource API ,而不再是拓展 catalyst 的方式。

  • Write

  • 能够使用 DSV2 改写 write

  • Spark SQl

  • 支持更多的 SQL:Insert,delete,update,mergeinto


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TiSpark 如何扩展 Spark_TiDB 社区干货传送门_InfoQ写作社区