大数据培训 Hive 到 Spark 离线计算实践
1. 背景介绍
2018 年 B 站基于 Hadoop 开始搭建离线计算服务,计算集群规模从最初的两百台到发展到目前近万台,从单机房发展到多机房。我们先后在生产上大规模的使用了 Hive、Spark、Presto 作为离线计算引擎,其中 Hive 和 Spark 部署在 Yarn 上,具体的架构如下,目前每天有约 20w 的离线批作业运行在 Spark 和 Hive 上,下面介绍下我们做了哪些工作来确保这些作业的高效与稳定。
编辑
2. 从 Hive 到 Spark
21 年初的时候 Hive 还是 B 站主要的离线计算引擎,80%以上的离线作业使用 Hive 执行,Spark2.4 作业占比接近 20%,集群资源的使用率长期维持在 80%以上。21 年 3 月 Spark3.1 发布,相较于 Spark2.4 性能有了较大的提升,我们开始推动 Spark3.1 在 B 站的落地,同时将 Hive-SQL 整体迁移至 Spark-SQL。
在 B 站,离线计算的调度已经完成了收口,80%以上的作业来自于自建的 BSK 调度平台,其余的作业基本也都是 airflow 提交上来的,只有少量的任务来自散落的开发机。在推动 Hive 升级 Spark 时只要将调度平台的作业完成迁移就可以覆盖 90%以上的作业。起步阶段我们进行了少量的人工迁移,对用户 SQL 进行了简单改写,修改了输入输出表后由两个引擎执行,开发了一个结果对比的工具,通过对双跑结果分析保障迁移效果。基于这个操作链路我们自研了一个自动迁移工具,减少人工失误和人力压力。
编辑
2.1 语句转换
我们重写了 SparkSqlParser,将从调度系统中收集到的 SQL 进行输入输出表的替换,避免对生产环境的影响。调度平台进行作业调度时以 DAG 为单位,一个调度任务里面可能存在多条 SQL,这些 SQL 的输入输出表间存在依赖关系,为了保证双跑尽可能的模拟生产表现,对一个 DAG 里面的多个调度作业进行输入输出表替换时进行整体替换,保证了相互间依赖。对于 Select 语句因为本身没有输出表,需要将 Select 语句转换为 CTAS 语句,这样就能将执行结果落地进行对比,需要注意的是转换过程中要将列名进行编码防止中文列导致的建表失败。当迁移工具识别出 SQL 语句为 DDL 语句,如果不是 CTAS 这种需要消耗计算资源的就直接跳过对比,同时对该语句进行标记,保证交由 Hive 执行,防止意外的元信息修改。
2.2 结果对比
双跑输出结果的对比是保证数据准确性的关键。首先对两个结果表的 Schema 进行对比,这个通过调用 DESC 语法返回结果对照就可以完成。对于 Schema 一致的两个表则进行下一步操作,两表全量数据对比,我们设计了一个 SQL 对数据按行进行整体对比,具体的对比思路如图:
编辑
第一步将两表按所有列(这里是 name 和 num 字段)进行 GROUP BY,
第二步 UNION ALL 两表数据,第三步再按所有列(这里是 name, num 和 cnt 字段) GROUP BY 一次产生最终表,在最终表中 cnts 值为 2 的行表示这行数据在两表中都有且重复值一致,对于值非 2 的数据就是差异行了。
从上图的例子来说差异行 Jack|1|2|1 表示 Jack|1 这行数据数据在一个表中存在两行,结合差异行 Jack|1|1|1 来看其实就是 Jack|1 这行数据一个表有一行另一个表有两行。通过这个方式就可以对双跑产出的结果表进行一个全量的对比_大数据培训。
通过这种结果对比方法可以完成大部分双跑任务的结果对比,但是对于结果表中存在 LIST、SET、MAP 这种容器类型的,因为在 toString 时顺序是无法保证的,所以会被识别为不一致,此外对于非稳定性的 SQL 如某列数据是 random 产生,因为每次执行产出的结果不一致,也会识别为对比失败,这两种情况下就需用人工的介入来分析了。
资源利用率的提升是做引擎升级的出发点,除了结果对比来保证数据准确性,我们还做了资源消耗对比来保证迁移的收益。对比系统收集了每个作业的执行时间以及消耗的资源,从执行时间、CPU 和内存的资源消耗进行两个引擎执行性能的对比,在执行最终迁移前依据收集的数据为用户提供了迁移的预期收益,提高了用户迁移任务的积极性。从迁移中收集的数据来看 hive 切到 spark 可以减少 40%以上的执行时间,同时整体资源消耗降低 30%以上。
编辑
2.3 迁移 &回滚
迁移系统对每个任务都执行了至少 3 次的双跑对比,但依然不能完全消除执行迁移的风险,在实际迁移过程中的几次问题都是迁移后稳定性不符合预期导致的,因此迁移系统对于迁移后的任务增加了监控,在一个任务迁移后,该任务的前 3 次调度执行消耗的时间、CPU 和内存资源将被用来和迁移前的七次平均执行数据对比,如果存在负优化的情况则会将这个任务执行引擎进行回滚并通知我们介入进行进一步分析。
3. Spark 在 B 站的实践
3.1 稳定性改进
3.1.1 小文件问题
随着 B 站业务高速发展,数据量和作业数增长越来越快,伴随而来的小文件数也快速增长,小文件太多会增加 HDFS 元数据的压力,在计算引擎读取时也大大增加了读请求的数量降低了读取效率。为了解决小文件的问题,在写表场景下对 Spark 做了如下两种改造。
兜底小文件合并:我们修改了数据的写出目录,引擎计算先写到一个中间目录,在 FileFormatWriter.write 结束后 refreshUpdatedPartitions 前,插入了一个文件合并逻辑,从中间目录中获取分区下文件的平均大小,对于不存在小文件情况的目录直接 MV 到最终目录,对于存在小文件的目录新增一个读 RDD coalesce 到一个合适值写出后 MV 到最终目录。
基于 reparation 的小文件合并:可以看到兜底小文件合并方式需要先将数据落地到 HDFS,重新读取后再写出,这样做放大了 HDFS 写操作(三副本),降低了计算引擎的执行性能。而 Spark3 的 AQE 特性可以在有 shuffle 的场景下有效解决小文件的问题,很多情况下对于没有 shuffle 的场景新增一个 reparation 操作就可以借助 AQE 的能力解决小文件的问题。社区 AQE 对于 reparation 这个 hint 是不会调整目标分区数的,我们新增了一个 rebalance hint,本质上和 reparation 一样只是将 AQE 的特性应用在了这个操作上,同时将 AQE 目标 size 相关的属性和 rebalance 设置属性做了隔离方便更好的设置文件大小而不影响计算的并行度。rebalance 操作会在最终写出前增加一个 shuffle stage,有些情况下没有这个 stage 上游输出就已经没有小文件了,为此作业是否增加 rebalance 操作依赖于我们对任务的画像通过 HBO 系统开启。
3.1.2 shuffle 稳定性问题
Shuffle 稳定性直接影响了 Spark 作业的 SLA,在 B 站推动 Spark 升级过程中成为用户顾虑的点。
shuffle 磁盘分级:B 站 Yarn 主集群采用 DataNode 和 NodeManage 混部模式,节点配置了多块 HDD 盘和少量 SSD 盘,NM 以 HDD 盘作为计算盘,由于和 DN 没有做到 IO 隔离,DN 和 shuffle service 经常互相影响,因此我们对 DiskBlockManager 进行了改造,优先使用 SSD 盘下的目录作为工作目录,当 SSD 盘存储空间或者 inode 紧张时则降级到 Yarn 配置的计算目录,借助 SSD 优异的随机 IO 能力,有效的提高的了 shuffle 稳定性。
remote shuffle service:push based shuffle 方案可以大量降低磁盘随机 IO 读请求,如下图:
编辑
通过中间服务将同属一个分区的数据进行归并,后续 reduce 操作就不需要从上游所有的 Map 节点拉取数据,在 shuffle 上下游 Task 数量多的情况下会对磁盘 IO 压力指数放大,生产上 shuffle heavy 的任务表现很不稳定,经常出现 FetchFailed Exception。
B 站在推动 RSS 落地时选择了社区 3.2 Push based shuffle 的方案,这个方案主要的优点是对 AQE 支持比较好,缺点是因为本地也要写一份数据放大了写。将数据先写本地后异步的发送到 driver 维护的 executor 节点的 external shuffle 节点上,后续生产实践中该方案有个问题,就是当作业启动时通常 driver 维护的 executor 数不足以满足远程节点的选择,而 SQL 作业参与计算的数据量通常是随着过滤条件层层递减的
通常 shuffle 数据量大的时候因为没有足够的节点会 fall back 到原先的 shuffle 方式,为了解决这个问题,我们新增了 shuffle service master 节点,具体调用流程如下图,所有的 external shuffle 节点启动时都会注册到 shuffle master 节点上,后续节点本身也会周期性的上报心跳和节点繁忙程度
DAGScheduler 后续请求远程节点都从 shuffle master 申请,这样不仅解决了冷启动节点不足的问题,在节点选择上也考虑了节点的健康程度。因为是先落盘后发送,在 stage 执行结束后会有一个等待时间,这里面会有个性能回退的问题,对小任务不友好,所以在生产应用中我们基于任务画像系统 HBO 自动决定任务是否启用 RSS 服务,目前生产大约 7%的大任务在使用 RSS 服务,这些任务平均执行时间缩短了 25%,稳定性有了显著提升。
编辑
目前 B 站生产中使用该方案基本解决了 shuffle 稳定性的问题,不过这套方案依旧需要计算节点配置本地 shuffle 盘,在本地落 shuffle 数据,无法支持存算分离的架构。后续我们在 k8s 上会大规模上线混部集群,需要尽量不依赖本地磁盘,避免对在线应用的影响,我们也关注到腾讯和阿里相继开源各自的 RSS 方案,我们也在尝试在生产中使用纯远程 shuffle 方案来满足 Spark on K8s 的技术需要。
3.1.3 大结果集溢写到磁盘
在 adhoc 场景中用户通常会拉取大量结果到 driver 中,造成了大量的内存消耗,driver 稳定性又直接影响着用户即席查询的体验,为此专门优化了 executor fetch result 的过程,在获取结果时会监测 driver 内存使用情况,在高内存使用下将拉取到的结果直接写出到文件中,返回给用户时则直接分批从文件中获取,增加 driver 的稳定性。
3.1.4 单 SQL task 并行度、task 数、执行时间限制
生产上我们按队列隔离了用户的 adhoc 查询,在实践过程中经常性的遇到单个大作业直接占用了全部并行度,有些短作业直接因为获取不到资源导致长时间的 pending 的情况,为了解决这种问题首先对单个 SQL 执行时间和总 task 数进行了限制,此外考虑到在 task 调度时有资源就会全部调度出去,后续 SQL 过来就面临着完全无资源可用的情况,我们修改了调度方法对单个 SQL 参与调度的 task 数进行了限制,具体的限制数随着可用资源进行一个动态变化,在 current executor 数接近于 max executor 的情况下进行严格限制 ,在 current executor 数明显少于 max executor 的情况下,提高单 SQL 并行的 task 总数限制。
3.1.5 危险 join condition 发现 & join 膨胀率检测
危险 join condition 发现
在选择 join 方式的时候如果是等值 join 则按照 BHJ,SHJ,SMJ 的顺序选择,如果还没有选择出则判断 Cartesian Join,如果 join 类型是 InnerType 的就使用 Cartesian Join,Cartesian Join 会产生笛卡尔积比较慢,如果不是 InnerType,则使用 BNLJ,在判断 BHJ 时,表的大小就超过了 broadcast 阈值,因此将表 broadcast 出去可能会对 driver 内存造成压力,性能比较差甚至可能会 OOM,因此将这两种 join 类型定义为危险 join。
如果不是等值 join 则只能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 时选不出 build side 说明两个表的大小都超过了 broadcast 阈值,则使用 Cartesian Join,如果 Join Type 不是 InnerType 则只能使用 BNLJ,因此 Join 策略选择 Cartesian Join 和第二次选择 BNLJ 时为危险 join。
join 膨胀率检测
ShareState 中的 statusScheduler 用于收集 Execution 的状态和指标,这其中的指标就是按照 nodes 汇总了各个 task 汇报上来的 metrics,我们启动了一个 join 检测的线程定时的监控 Join 节点的 "number of output rows"及 Join 的 2 个父节点的 "number of output rows" 算出该 Join 节点的膨胀率。
倾斜 Key 发现
数据倾斜是 ETL 任务比较常见的问题,以 shuffle 过程中的倾斜为例,通常有以下几个解决方法:增大 shuffle 的分区数量从而使数据分散到更多的分区中;修改逻辑,将 shuffle 时的 key 尽可能打散;单独找出产生了极大倾斜的 key,在逻辑中单独处理等等。但在进行这些处理之前,我们都需要先知道倾斜发生在 SQL 逻辑的哪个部分以及发生倾斜的是哪些 key。为了帮助用户自助高效的解决数据倾斜问题,我们实现了倾斜 key 发现的功能。以 SortMergeJoin 为例,在 shuffle fetch 阶段,首先根据 mapStatuses 计算出每个 partition size,并根据一定策略判断该 task 所处理的 partition 是否倾斜。如果倾斜,则在 join 阶段对数据进行采样,找到发生倾斜的 key,通过 TaskMetric 发送到 driver 端,driver 端消费 metric 后会记录倾斜信息。
上面这些 bad case 在运行时发现后会自动将信息发送到我们内部作业诊断平台,用户可以查看并对语句做优化和改进。
3.2 性能优化
3.2.1 DPP 和 AQE 兼容
spark3.1 的 DPP 和 AQE 存在兼容问题,在使用 AQE 后 DPP 的策略就无法生效,这个问题在 3.2 得到了修复,我们将 3.2 的相关代码 backport 回来,从 TPCDS 测试上看对 3.1 有很明显的提升。
3.2.2 AQE 支持 ShuffledHashJoin
AQE 通过对 map 阶段收集的指标数据来优化 Join 方式,对于存在小表的情况能将 SMJ 优化为 BHJ,这个操作可以显著的优化性能。Spark 的 shuffle 策略还有一个就是 ShuffledHashJoin,该策略性能相对较好,但内存压力大,在默认情况下为了保证任务的稳定性我们将其关闭,基于 AQE 的思想,在 map 完成后收集 partition size,当最大的 partition size 小于定义的值后,通过新增 DynamicJoin 优化策略将 SMJ 优化为 SHJ。
3.2.3 Runtime filter
DPP 通过对大表直接进行 partition 级别的裁剪,可以大大提高查询速度,但 DPP 的适用条件也相对严格,需要大表的分区列参与 join,但如果大表参与 join 的列为非分区列则无法应用。
我们知道 shuffle 是比较耗时的操作,shuffle 的数据量越大,耗时越久,而且对网络,机器 IO 都会产生比较大的压力。如果能在大表 shuffle 前根据非分区列的 join 列对其进行过滤,即使无法像 DPP 一样直接减少从存储中读取的数据量,但减小了其参与 shuffle 以及后续操作的数据量,也能获得比较不错的收益,这就是 runtime filter 的动机,即运行时预先扫描小表获取 join 列的值,构造 bloom filter 对大表进行过滤。
具体实现思路和 DPP 基本一致,首先在 SparkOptimizer 新增 DynamicBloomFilterPruning 规则,逻辑上类似 PartitionPruning,符合一系列判断条件后插入一个节点 DynamicBloomFilterPruningSubquery。与 DPP 不同的是,如果 join 可以被转化为 BroadcastHashJoin,则不会应用该规则,因为在 BroadcastHashJoin 的情况下对大表进行预先的过滤其实是多余的(非 pushdown 的情况下)。判断是否加入 filter 节点的主要逻辑如下,这里以裁剪左表(左右两侧都为 logicalPlan,为了方便表达,用左右表指代)为例进行说明,需要满足以下条件:
右表 rowCount 需要小于左表
Join 类型支持裁剪左表
右表 rowCount > 0
右表 rowCount 小于 spark.sql.optimizer.dynamicBloomFilterJoinPruning.maxBloomFilterEntries,默认值为 100000000,避免 bloom filter 占用内存过大
右表中没有 DynamicBloomFilterPruningSubquery
右表不是 stream 且存在 SelectivePredicate
左表(这里的左表是真正的左表或者包含左表的 Filter 节点)没有 SelectivePredicate,因为如果存在 SelectivePredicate,那么下一步便无法根据统计信息去计算过滤收益
在 prepare 阶段,PlanAdaptiveSubqueries 会把 DynamicBloomFilterPruningSubquery 节点替换为 DynamicPruningExpression(InBloomFilterSubqueryExec(_, _, _)),扩展了 PlanAdaptiveDynamicPruningFilters,支持对以上节点进行处理。新增了 BuildBloomFilter 和 InBloomFilter 两个 UDF。BuildBloomFilter 在 sparkPlan prepare 阶段提交任务构造 BloomFilter 并 broadcast 出去,具体的 evaluate 逻辑还是交给 InBloomFilter。另外在 AQE 的 reOptimize 阶段也新增了规则 OptimizeBloomFilterJoin,这个规则主要是用来根据执行过程的 metric 信息更新 BuildBloomFilter 的 expectedNumItems。
编辑
可以看到在开启了 runtime filter 后数据量在 join 前从 120 亿条降至 3W 条,收益还是相当明显的。
3.2.4 Data skipping
目前 B 站离线表存储主要使用 orc、parquet 格式,列式存储都支持一定程度的 data skipping,比如 orc 有三个级别的统计信息,file/stripe/row group,统计信息中会包含 count,对于原始类型的列,还会记录 min/max 值,对于数值类型的列,也会记录 sum 值。在查询时,就可以根据不同粒度的统计信息以及 index 决定该 file/stripe/row 是否符合条件,不符合条件的直接跳过。
对于统计信息及索引的细节见 orc format (https://orc.apache.org/specification/ORCv1/) 和 orc index (https://orc.apache.org/docs/indexes.html) 。Parquet 与 orc 类似,也有相应的设计,具体见 parquet format (https://github.com/apache/parquet-format) 和 parquet pageIndex (https://github.com/apache/parquet-format/blob/master/PageIndex.md) 。
虽然 orc/parquet 都有 data skipping 的能力,但这种能力非常依赖数据的分布。前面提到统计信息中会包含每一列的 min/max 值,理论上如果查询条件(比如> < =)不在这个范围内,那么这个 file/stripe/row group 就可以被跳过。但如果数据没有按照 filter 列排序,那最坏的情况下,可能每个 file/stripe/row group 的 min/max 值都一样,这样就造成任何粒度的数据都不能被跳过。为了增加列式存储 data skipping 效果,可以通过对数据增加额外的组织,如下:
select
count(1)
from
tpcds.archive_spl_cluster
where
log_date = '20211124'
and state = -16
表 archive_spl,不调整任何分布与排序
编辑
表 archive_spl_order,order by state,avid
编辑
通过对 state 进行 order 后 scan 阶段数据量直接从亿级别降至数十万级别。在生产中我们通过对 SQL 进行血缘分析找到那些热点表及高频 filter 列,将这些热列作为 table properties 存入 hms 中,在 Spark 执行时根据从 hms 中获取的列信息,通过相应的优化规则,物理计划自动增加 sort 算子,完成对数据组织。这个方案是基于列存优化数据组织来进行 data skipping,目前我们也在往索引方向上进一步探索。
3.3 功能性改进
3.3.1 对于 ZSTD 的支持
Spark 社区在 3.2 版本全面支持了 ZSTD 压缩,为了更好的使用 ZSTD,我们在 Spark3.1 的基础上引入了社区的相关 patch。其中也遇到了一些问题。在测试 ZSTD 的过程中偶然发现下推到 ORC 的过滤条件没有生效,经调查发现是 ORC 代码的 bug,在和社区讨论之后,我们修复了该 bug 并将 patch 提交给了社区:https://issues.apache.org/jira/browse/ORC-1121 。
离线平台的 Presto 也承接了很多 ETL 任务,由于 Presto 使用的是自己实现的 ORC reader/writer,所以在 Spark 升级 ORC 版本之后,对一些 Presto 写出的表,出现了查询结果错误的问题。
正常情况下,Apache ORC writer 在写文件时会记录每个 stripe/rowGroup 中每列的统计信息,如 min/max 等。Apache ORC reader 在读取文件时会根据这些统计信息结合下推的过滤条件进行 stripe/rowGroup 级别的过滤。
但 Presto ORC writer 在写文件时,如果 String 类型的列长度超过 64 bytes,则这一列不会记录 min/max 信息。虽然 Presto ORC reader 可以正常处理这类文件,但 Spark/Hive 使用的 Apache ORC reader 因为无法正常的反序列化 columnStatistics 得到正确的统计信息,导致做 stripe/rowGroup 级别的过滤时出现了错误的结果。我们也发现这个问题是由于 ORC 1.6 版本的一次代码重构导致,1.5 及之前是没有该问题的。我们已在内部分支修复了该问题,也已将问题反馈给社区。
3.3.2 多格式混合读兼容
历史上很多表使用了 text 存储,在资源上造成了很大的浪费,通过修改表的元信息可以保障新增分区切换到列存,这就造成了一个离线表可能存在多种 fileformat 的情况,为了兼容我们修改了 DataSourceScanExec 相关的逻辑,将 reader 的实例化从基于 table 元信息粒度细化到分区元信息粒度。
3.3.3 转表 &小文件合并语法
为了方便用户修改表的存储格式和文件压缩格式我们在引擎层提供了相关语法及具体实现。用户可以通过指定分区条件对特定分区进行转换。
CONVERT TABLE target=tableIdentifier
(convertFormat | compressType) partitionClause? #convertTable
MERGE TABLE target=tableIdentifier
partitionClause? #mergeTable
3.3.4 字段血缘
作业间的依赖关系分析、数据地图等业务都需要 SQL 血缘的支持,团队后续工作(z-order , analyze , index)也需要依赖血缘,我们通过注册一个 LineageQueryListener 继承 QueryExecutionListener,在 onSuccess 方法拿到当前执行的 QueryExecution,通过 analyzedLogicalPlan,利用 NamedExpression 的 exprId 映射关系,对其进行遍历和解析,构建出字段级血缘(PROJECTION/PREDICATE)和 levelRelation(层级关系)。
3.4 基于历史执行的自动参数优化(HBO)
Spark 提供了大量的参数设置,对于用户而言了解这些参数并使用好需要花费很大的代价,在很多情况下不同的参数调优对于 spark 的作业执行和资源消耗会有很大差异。为了尽可能的适配任务执行,我们预设了一组参数,这种统一配置存在很多问题,以内存而言为了适配尽可能多的任务,该值设置偏大,通过对执行的分析发现大量的任务存在资源浪费的问题,整体的内存利用率仅 20%左右。要求每个用户成为专家对作业进行细致的调优显然不可能,因此我们设计了 HBO 系统,具体的思路如下图:
编辑
首先对任务执行的 SQL 进行了指纹计算,通过指纹来标识该任务每天执行情况,将每次执行中采集到的 metrics 收集后用策略进行分析给出相应的参数优化建议,在下次执行的时候根据指纹来获取推荐的执行参数,对于使用默认参数的任务则进行覆盖,对于那些用户指定的参数则优先使用用户参数。
内存优化策略:通过收集每个 executor 的峰值内存,如果峰值内存占配置内存比值低于 30%,就推荐使用更少的内存来执行此次的计算,对于峰值内存占比过高的任务,则调大内存配置。通过这个策略生产上的内存使用率提升至 50%左右。
并行度优化策略:生产上开启了动态资源配置,在对数据分析时发现有些节点从分配后就没有 task 执行过,完全浪费了节点的资源,对于这些任务会在下次执行的时候降低 spark.dynamicAllocation.executorAllocationRatio 值来降低执行并行度,此外默认提供的 spark.sql.shuffle.partitions 值对于大任务来说执行并行度不够,后续也将进行自动的调整。
优化 shuffle 策略:如上文所讲 RSS 对小任务存在性能下降的问题,通过对 block size、shuffle 数据量的分析,HBO 系统只会对那些 shuffle heavy 任务开启使用 RSS 服务。
小文件合并策略:小文件合并会消耗额外的资源,对于不存在小文件情况的作业 HBO 系统会关闭小文件合并相关的配置。
此外平时工作中一些 feature 的上线也会依赖该系统进行一个灰度过程。
3.5 Smart Data Manager (SDM)
Smart Data Manager(SDM)是我们自研的一个对数据进行组织和分析的服务,通过对数据的额外处理将我们对 Spark 的一些技改真正落地。它的整体架构如图,目前提供了如下的几个数据组织和分析能力:
编辑
表存储和压缩方式的转换:将表从 Text 存储转换为 ORC 或 Parquet 存储,将压缩类型从 None 或 Snappy 转换为 ZSTD 可以带来不错的存储和性能收益,SDM 提供了按分区对表异步进行转换的能力。
数据重组织:在分区内部按列对数据进行 order/zorder 组织可以有效的提高 data skipping 的效果,新增分区通过查询 table properties 中的排序列 meta 来改写执行计划应用,存量分区就可以通过 SDM 重刷。
Statistics 的统计:开启 CBO 时需要依赖对表统计信息的收集,在对 hive 表的列进行索引时也依赖收集到的列基数和操作信息选择合适的索引类型,通过 sdm 监听 hms 的 partition 事件就可以在分区更新时异步完成信息采样。
小文件合并:对有小文件较多的分区和表异步进行小文件合并,减少 namenode 的压力
Hive 表索引:通过分析血缘信息得到热表热列上的高频操作(点查,范围查询),基于此在分区文件层面异步的建立索引来加速查询。
血缘解析:解析语句,分析字段血缘,吐出 UDF 血缘、算子(order by / sort by / group by...)影响关系等
对数据进行重组织时会涉及到对数据的读写,为了防止对生产作业的影响我们在进行操作时会修改相关表的 Table Properties 增加锁表标记,各个计算引擎适配实现了类 Hive 的锁管理机制,由 Hive metastore 统一作为 lock manager,在对表和分区并发操作场景下,做到对用户完全透明。
4. Hive Meta Store 上的优化
B 站使用 HMS(Hive MetaStore)管理所有的离线表元信息,整个的离线计算的可用性都依赖 HMS 的稳定性。业务方在使用分区表时存在不少 4 级及以上分区的情况,有多个表分区数超百万。分区元信息庞大单次分区获取代价高,原生 HMS 基于单个 MySQL 实例存在性能瓶颈。
4.1 MetaStore Federation
随着多机房业务的推进,独立业务的 HDFS 数据和计算资源已经迁移到新机房,但是 HIVE 元数据仍在原有机房的 Mysql 中,这时候如果发生机房间的网络分区,就会影响新机房的任务。
为了解决上述问题,我们进行了方案调研,有两种方案供我们选择:
WaggleDance
HMS Federation
4.1.1 WaggleDance
WaggleDance 是开源的一个项目(https://github.com/ExpediaGroup/waggle-dance),该项目主要是联合多个 HMS 的数据查询服务,实现了一个统一的路由接口解决多套 HMS 环境间的元数据共享问题。并且 WaggleDance 支持 HMS Client 的接口调用。主要是通过 DB,把请求路由到对应的 HMS。
编辑
4.1.2 HMS Federation
HMS Federation 是解决多机房场景下的 HIVE 元数据存储问题,HIVE 元数据和 HDFS 数据存储在同一个机房,并且允许跨机房访问 HIVE 元数据。比如主站业务的 HDFS 数据存放在 IDC1,那么主站业务 HDFS 数据对应的 HIVE 元数据就存在 IDC1 的 Mysql,同样直播业务的 HDFS 数据和 HIVE 元数据都存放在 IDC2。
同时 HMS Federation 也提供了 Mysql 的横向扩容能力,允许一个机房可以有多个 Mysql 来存放 HIVE 元数据,如果单个 Mysql 的压力过大,可以把单个 Mysql 的数据存放到多个 Mysql 里面,分担 Mysql 的压力。比如主站业务的 HIVE 库,zhu_zhan 和 zhu_zhan_tmp,可以分别放在 idc1-mysql1 和 idc1-mysql2。
我们在 HMS Federation 中加入了一个 StateStore 的角色,该角色可以理解为一个路由器,HMS 在查询 Hive 库/表/分区之前,先问 StateStore 所要访问的 HIVE 元信息存放在哪一个 Mysql 中,获取到了对应的 Mysql 后,构建相应的 ObjectStore,进行 SQL 拼接或者是利用 JDO 查询后端 Mysql。
编辑
4.1.3 HMS Federation 与 WaggleDance 的对比
数据迁移
我们的主要目的是实现 HIVE 元数据按业务划分到各自 IDC 的 Mysql
WaggleDance 并没有提供相应元数据迁移工具,要迁移需要停止整个 HIVE 库新建表/分区,才能够开始迁移过去,对业务影响较大。
HMS Federation 可以按表的粒度迁移,对业务影响较小,并且可以指定某个 HIVE 库下,新建表在新的 Mysql,旧的等待着锁表迁移。
运维复杂度
WaggleDance 方案需要不同的 HMS,配置不同的 Mysql 地址,增加了 HMS 配置的复杂度。WaggleDance 是一个独立的服务,为了保证可用性,运维复杂度会再一次提升。
HMS Fedration 是 HMS 的功能升级,在 HMS 代码上开发,并且使用统一的配置。
综合上述对比,我们最终选择了 HMS Federation 的方案。通过修改 HMS 的代码,实现元数据跨 Mysql 存储。
4.2 MetaStore 请求追踪和流量控制
HMS 在处理 getPartitions 相关请求的时候,如果拉取的分区数量非常多,会给 HMS 的堆内存,以及后端的 Mysql 带来很大的压力,导致 HMS 服务响应延迟。
为了能够快速的定位到有问题的任务,我们在 Driver 中将 Job 相关的信息保存到 Hadoop CallerContext 中,在调用 HMS 接口的时候将 CallerContext 中的相关属性设置到 EnvironmentContext 中透传到 HMS 端,同时扩展了所有 getPartitions 相关的接口支持传递 EnvironmentContext,EnvironmentContext 中的 properties 会在 HMS 的 audit log 中打印出来,方便问题任务的定位。
同时为了提高 HMS 服务的稳定性,我们在 HMS 端也做了接口的限流和主动关闭大查询。对于限流,我们新增了一个 TrafficControlListener,当接口被调用的时候会以 function 和 user 为单位记录 Counters 保存在该 Listener 中,同时在该 Listener 中启动采集 used memory 和 counters 的线程,当平均使用内存达到阈值时,检查接口的 QPS,如果 qps 达到阈值会让调用接口的线程 sleep 一段时间,下一次检查通过或者达到最大等待时间后放行。
HMS 也有可能因为 getPartitions 方法返回的分区数量太大导致内存被打满,一方面我们限制了 getPartitions 从 mysql 返回的分区数量,超过一定数量就直接拒绝该请求,另一方面我们在 TProcessor 中以 threadId 和 socket 为 key 和 value 保存当前的连接,在检查 partition 数量时我们也按照 threadId 和 num partitions 为 key 和 value 保存 partition 的 cost,当 HMS 平均使用内存达到阈值超过一定时间后,会选择 num partitions 最大的 threadId,再根据 threadId 获取对应的连接,主动 close 该连接,来缓解内存压力。
编辑
文章来源于数据仓库与 Python 大数据
评论