写点什么

Spark Driver CPU 占用异常问题排查

作者:观远数据
  • 2022 年 8 月 22 日
    浙江
  • 本文字数:2140 字

    阅读完需:约 7 分钟

本文作者:Jamie,观远数据 in-house Spark 之父,ABI 领域史诗级工程师。


年初我们接到了一个客户反馈,表示服务器 cpu 占用异常,于是我们远程连接到服务器上面排查,发现是 Spark driver 占用了大部分 cpu。对于 cpu 占用问题,用 jstack 能很快定位到 jvm 的执行逻辑。分析 jstack 结果发现,大部分占用 cpu 的线程都在执行一个叫做 transpose window 的优化规则,而且都和这个逻辑里的一段方法有关:


private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {  ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(    ps1.zip(_).forall {      case (l, r) => l.semanticEquals(r)    })  }
复制代码


这段逻辑看上去并不复杂,注意到这个方法里有一个 permutations 函数调用,这个函数会返回一个数组的所有排列。对于一个有 n 个不同元素的数组,他的排列数是 n!,也就是说这个算法时间复杂度会到 O(n!),那么我们遇到的问题很有可能和这个有关系了。但是我们还是需要找到是什么 sql 语句触发了这个问题。


从监控上来看,driver 的 cpu 是呈阶梯状上升的,那这些上升的时间点应该就是有问题任务提交的时间点,再结合 call stack 里面的逻辑是在做 window 相关的优化,我们重点去找这些时间点包含 window function 相关的任务。很快我们就定位到了一个 ETL,从监控上看,每运行一次这个 ETL,Spark driver 就会多占用一个 cpu,并且长时间不释放。


原始的 ETL 逻辑比较复杂,我们把他简化之后发现只和两个带窗口函数的计算字段有关系,特点就是 partition by 使用的字段比较多,为了 debug 方便,我们用 spark-shell 复现了一下:


val df = spark.range(10).selectExpr("id AS a1", "id AS a2", "id AS a3", "id AS a4", "id AS a5", "id AS a6", "id AS a7", "id AS a8", "id AS a9", "id AS a10", "id AS a11", "id AS a12", "id AS a13", "id AS a14", "id AS a15", "id AS a16") df.selectExpr(  "sum(`a16`) OVER(PARTITION BY `a1`,`a2`,`a3`,`a4`,`a5`,`a6`,`a7`,`a8`,`a9`,`a10`,`a11`,`a12`,`a13`,`a14`,`a15`) as p1",   "sum(`a16`) OVER(PARTITION BY `a14`,`a2`,`a3`,`a4`,`a5`,`a6`,`a7`,`a8`,`a9`,`a10`,`a11`,`a12`,`a13`,`a1`) as p2"  ).explain
复制代码


在 3.0 版本以上的 spark-shell 里面运行上面的代码就会发现 spark-shell 卡住了,而卡住的地方正是 compatiblePartitions 方法。


也就是说如果加了多个带有窗口函数的计算字段,而 partition by 的字段过多的话,很容易触发这个问题,比如上面这个例子,大家可以算算 14 个元素的数组全排列有多少种组合。那么这个问题我们有没有办法优化呢。


一般遇到 Spark 相关的问题,我们可以先去 JIRA 上面搜索一下有没有人提过类似的问题,因为 Spark 的使用非常广泛,一般遇到的问题很可能之前已经有人发现甚至修复了。但是我们用各种关键字搜索之后也找不到相关的问题,看来这个问题只能靠我们自己来解决了。


首先需要看一下相关逻辑是什么时候引入的,引入的原因是什么。查看提交历史可以发现,是为了解决这个问题:https://issues.apache.org/jira/browse/SPARK-20636,之所以想要 transpose window,是想要减少一次 shuffle 操作。而 compatiblePartitions 里面的逻辑,就是在判断是否需要 transpose 两个 window。


从现有代码的逻辑反推这个 compatible 的定义,应该是 window1 的 partition 字段是 window2 的 partition 字段前缀的一种排列。举几个例子就比较清楚了,比如 window2 是 partition by('a', 'b', 'c', 'd'),那么 window1 可以是 partition by('a'), partition by('a', 'b'), partition by('b', 'a'), partition by('c', 'a', 'b') 等等,但是不能是 partition by('b'), partition by('a', 'c') 等。


但是这个逻辑其实并不是很合理,一个是算排列代价太高,另一个是有些可以 transpose 的 case 却没有做,比如上面的 partition by('b'), partition by('a', 'c') 等。另外考虑一些重复字段的 case,比如 partition by('a', 'a'),这种原来的算法也是不行的,所以这个 compatible 可以定义成 window1 的 partition by 里面所有的字段在 window2 里面都能找到,那么我们就可以做 transpose 来减少 shuffle,用代码来表示就是:


private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {  ps1.length < ps2.length && ps1.forall { expr1 =>    ps2.exists(expr1.semanticEquals)  }}
复制代码


这样我们既避免了复杂的 permutation,也让这个优化的适用范围增加了。


经过一系列测试发现改动是有效果的,于是我们向社区提交了 issue 和相关的 PR,感兴趣的同学可以查看具体内容:


虽然一开始沉没在 PR 的海洋之中,不过时隔半年又被国际友人捞了出来,最终也顺利被社区采纳,这样后续我们只需要升级到官方的版本就可以解决这个问题了。观远数据的 Spark 贡献者名单中也又多了一位新同学,后续我们会持续关注实践中遇到的 Spark/Delta 等相关问题,为开源项目发展添砖加瓦。

发布于: 刚刚阅读数: 3
用户头像

观远数据

关注

让业务用起来 2022.07.14 加入

观远数据技术团队实践分享

评论

发布
暂无评论
Spark Driver CPU 占用异常问题排查_spark_观远数据_InfoQ写作社区