写点什么

Spark SQL 字段血缘在 vivo 互联网的实践

  • 2022 年 4 月 25 日
  • 本文字数:4639 字

    阅读完需:约 15 分钟

作者:vivo 互联网服务器团队-Hao Guangshi


一、背景


字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?


有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。


Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。


平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。


二、前期调研


开发前我们做了很多相关调研,从中得知 Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展。


该方案可行,且对 Spark 的源码没有改动,代价也比较小,确定使用该方案。


三、Spark SQL 扩展

3.1 Spark 可扩展的内容


SparkSessionExtensions 是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:


  • 【Analyzer Rules】逻辑计划分析规则

  • 【Check Analysis Rules】逻辑计划检查规则

  • 【Optimizer Rules.】 逻辑计划优化规则

  • 【Planning Strategies】形成物理计划的策略

  • 【Customized Parser】自定义的 sql 解析器

  • 【(External) Catalog listeners catalog】监听器


在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。


而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。


3.2 实现自己的扩展


class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {  override def apply(spark: SparkSessionExtensions): Unit = {
//字段血缘 spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql解析器 spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
}}
复制代码


上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到 SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。要让 ExtralSparkExtension 起到作用的话我们需要在 spark-default.conf 下配置 spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension 在启动 Spark 任务的时候即可生效。


注意到我们也实现了一个自定义的 SQL 解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含 insert 的时候就将 SQLText(SQL 语句)设置到一个为 FIELD_LINE_AGE_SQL,之所以将 SQLText 放到 FIELD_LINE_AGE_SQL 里面。因为在 DheckRule 里面是拿不到 SparkPlan 的我们需要对 SQL 再次解析拿到 SprkPlan,而 FieldLineageCheckRuleV3 的实现也特别简单,重要的在另一个线程实现里面。


这里我们只关注了 insert 语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。


class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
override def parsePlan(sqlText: String): LogicalPlan = { val lineAgeEnabled = SparkSession.getActiveSession .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean logDebug(s"SqlText: $sqlText") if(sqlText.toLowerCase().contains("insert")){ if(lineAgeEnabled){ if(FIELD_LINE_AGE_SQL_COULD_SET.get()){ //线程本地变量在这里 FIELD_LINE_AGE_SQL.set(sqlText) } FIELD_LINE_AGE_SQL_COULD_SET.remove() } } delegate.parsePlan(sqlText) } //调用原始的sqlparser override def parseExpression(sqlText: String): Expression = { delegate.parseExpression(sqlText) } //调用原始的sqlparser override def parseTableIdentifier(sqlText: String): TableIdentifier = { delegate.parseTableIdentifier(sqlText) } //调用原始的sqlparser override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { delegate.parseFunctionIdentifier(sqlText) } //调用原始的sqlparser override def parseTableSchema(sqlText: String): StructType = { delegate.parseTableSchema(sqlText) } //调用原始的sqlparser override def parseDataType(sqlText: String): DataType = { delegate.parseDataType(sqlText) }}
复制代码

3.3 扩展的规则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
val executor: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = { val sql = FIELD_LINE_AGE_SQL.get FIELD_LINE_AGE_SQL.remove() if(sql != null){ //这里我们拿到sql然后启动一个线程做剩余的解析任务 val task = new FieldLineageRunnableV3(sparkSession,sql) executor.execute(task) }
}}
复制代码


很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在 FieldLineageRunnableV3。

3.4 具体的实现方法

3.4.1 得到 SparkPlan


我们在 run 方法中得到 SparkPlan:

override def run(): Unit = {  val parser = sparkSession.sessionState.sqlParser  val analyzer = sparkSession.sessionState.analyzer  val optimizer = sparkSession.sessionState.optimizer  val planner = sparkSession.sessionState.planner      ............  val newPlan = parser.parsePlan(sql)  PASS_TABLE_AUTH.set(true)  val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan) //得到sparkPlan val sparkPlan = planner.plan(optimizerPlan).next() ...............if(targetTable != null){ val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]() val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]() //projection projectionLineAge(levelProject, sparkPlan.child) //predication predicationLineAge(predicates, sparkPlan.child) ...............
复制代码


为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。


在这里补充一下 Spark SQL 解析的过程如下:



经过 SqlParser 后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过 Analyzer 会分析一些绑定信息,例如表验证、字段信息、函数信息;经过 Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是 RBO,当然 Spark 还支持 CBO 的优化;经过 SparkPlanner 后就成了可执行的物理计划。


我们看一个逻辑计划与物理计划对比的例子:


一个 SQL 语句:

select item_id,TYPE,v_value,imei from t1union allselect item_id,TYPE,v_value,imei from t2union allselect item_id,TYPE,v_value,imei from t3
复制代码


逻辑计划是这样的:



物理计划是这样的:



显然简化了很多。


得到 SparkPlan 后,我们就可以根据不同的 SparkPlan 节点做迭代处理。


我们将字段血缘分为两种类型:projection(select 查询字段)、predication(wehre 查询条件)。


这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。


想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:



那么我们迭代查询的结果应该为


id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。


注意到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过 projecti-onLineAge 迭代后 levelProject 存储了顶层 id,name,age 对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。


当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec 等都需要特殊考虑。


例子及效果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,C as (select id,name,max(age) from A group by A.id,A.name) ,B as (select id,name,age from tabb2 where age > 28)insert into tab3   select C.id,concat(C.name,B.name) as name, B.age from     B,C where C.id = B.id
复制代码


效果:

{  "edges": [    {      "sources": [        3      ],      "targets": [        0      ],      "expression": "id",      "edgeType": "PROJECTION"    },    {      "sources": [        4,        7      ],      "targets": [        1      ],      "expression": "name",      "edgeType": "PROJECTION"    },    {      "sources": [        5      ],      "targets": [        2      ],      "expression": "age",      "edgeType": "PROJECTION"    },    {      "sources": [        6,        3      ],      "targets": [        0,        1,        2      ],      "expression": "INNER",      "edgeType": "PREDICATE"    },    {      "sources": [        6,        5      ],      "targets": [        0,        1,        2      ],      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",      "edgeType": "PREDICATE"    },    {      "sources": [        3      ],      "targets": [        0,        1,        2      ],      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",      "edgeType": "PREDICATE"    }  ],  "vertices": [    {      "id": 0,      "vertexType": "COLUMN",      "vertexId": "default.tab3.id"    },    {      "id": 1,      "vertexType": "COLUMN",      "vertexId": "default.tab3.name"    },    {      "id": 2,      "vertexType": "COLUMN",      "vertexId": "default.tab3.age"    },    {      "id": 3,      "vertexType": "COLUMN",      "vertexId": "default.tab1.id"    },    {      "id": 4,      "vertexType": "COLUMN",      "vertexId": "default.tab1.name"    },    {      "id": 5,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.age"    },    {      "id": 6,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.id"    },    {      "id": 7,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.name"    }  ]}
复制代码

四、总结


在 Spark SQL 的字段血缘实现中,我们通过其自扩展,首先拿到了 insert 语句,在我们自己的检查规则中拿到 SQL 语句,通过 SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到了物理计划。


我们通过迭代物理计划,根据不同执行计划做对应的转换,然后就得到了字段之间的对应关系。当前的实现是比较简单的,字段之间是直线的对应关系,中间过程被忽略,如果想实现字段的转换的整个过程也是没有问题的。

发布于: 刚刚阅读数: 2
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
Spark SQL 字段血缘在 vivo 互联网的实践_大数据_vivo互联网技术_InfoQ写作社区