字节跳动数据平台技术揭秘:基于 ClickHouse 的复杂查询实现与优化
更多技术交流、求职机会、试用福利,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
ClickHouse 作为目前业内主流的列式存储数据库(DBMS)之一,拥有着同类型 DBMS 难以企及的查询速度。作为该领域中的后起之秀,ClickHouse 已凭借其性能优势引领了业内新一轮分析型数据库的热潮。但随着企业业务数据量的不断扩大,在复杂 query 场景下,ClickHouse 容易存在查询异常问题,影响业务正常推进。
字节跳动作为国内最大规模的 ClickHouse 使用者,在对 ClickHouse 的应用与优化过程中积累了大量技术经验。本文将分享字节跳动解决 ClickHouse 复杂查询问题的优化思路与技术细节。
项目背景
ClickHouse 的执行模式与 Druid、ES 等大数据引擎类似,其基本的查询模式可分为两个阶段。第一阶段,Coordinator 在收到查询后,将请求发送给对应的 Worker 节点。第二阶段,Worker 节点完成计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和处理,并将处理后的结果返回。
两阶段的执行模式能够较为高效地支持目前许多常见的业务场景,例如各类大宽表单的查询,这也是 ClickHouse 最擅长的场景。ClickHouse 的优点是简单、高效,通常来说,简单就意味着高效。但随着企业业务的持续发展,愈加复杂的业务场景对 ClickHouse 提出了以下三类挑战。
第一类,当一阶段返回的数据较多,且二阶段计算较为复杂时,Coordinator 会承受较大压力,容易成为 Query 的瓶颈。例如一些重计算的 Agg 算子,如 Count Distinct,若采用哈希表的方式进行去重,第二阶段需在 Coordinator 单机上去合并各个 Worker 的哈希表。这个计算量会很重且无法并行。
第二类,由于目前 ClickHouse 模式并不支持 Shuffle,因此对于 Join 而言,右表必须为全量数据。无论是普通 Join 还是 Global Join,当右表的数据量较大时,若将数据都放到内存中,会比较容易 OOM。若将数据 spill 到磁盘,虽然可以解决内存问题,但由于有磁盘 IO 和数据序列化、反序列化的代价,因此查询的性能会受到影响。特别是当 Join 采用 Hash Join 时,如果右表是一张大表,构建也会比较慢。针对构建问题,近期社区也进行了一些右表并行构建的优化,数据按照 Join key 进行 Split 来并行地构建多个 Hash Table,但额外的代价是左右表都需要增加一次 Split 操作。
第三类,则是关于复杂查询(如多表 Join、嵌套多个子查询、window function 等),ClickHouse 对这类需求场景的支持并不是特别友好,由于 ClickHouse 并不能通过 Shuffle 来分散数据增加执行并行度,并且其生成的 Pipeline 在一些 case 下并不能充分并行。因此在某些场景下,难以发挥集群的全部资源。
随着企业业务复杂度的不断提升,复杂查询,特别是有多轮的分布式 Join,且有很多 agg 的计算的需求会越来越强烈。在这种情况下,业务并不希望所有的 Query 都按照 ClickHouse 擅长的模式进行,即通过上游数据 ETL 来产生大宽表。这样做对 ETL 的成本较大,并且可能会有一些数据冗余。
企业的集群资源是有限的,但整体的数据量会持续增长,因此在这种情况下,我们希望能够充分地去利用机器的资源,来应对这种越来越复杂的业务场景和 SQL。所以我们的目标是基于 ClickHouse 能够高效支持复杂查询。
技术方案
对于 ClickHouse 复杂查询的实现,我们采用了分 Stage 的执行方式,来替换掉目前 ClickHouse 的两阶段执行方式。类似于其他的分布式数据库引擎,例如 Presto 等,会将一个复杂的 Query 按数据交换情况切分成多个 Stage,各 Stage 之间则通过 Exchange 完成数据交换。Stage 之间的数据交换主要有以下三种形式。
按照单个或者多个 key 进行 Shuffle
将单个或者多个节点的数据汇聚到一个节点上,称为 Gather
将同一份数据复制到多个节点上,称为 Broadcast 或广播
对于单个 Stage 执行,继续复用 ClickHouse 目前底层的执行方式。开发上按照不同功能切分不同模块。各个模块预定接口,减少彼此的依赖与耦合。即使模块发生变动或内部逻辑调整,也不会影响其他模块。其次,对模块采用插件架构,允许模块按照灵活配置支持不同的策略。这样便能够根据不同业务场景实现不同的策略。
首先,当 Coordinator 接受复杂的查询以后,它会在当前的语法树的基础上,根据节点类型和数据分布情况,插入 Exchange 节点,并生成一个分布式 Plan。其次,Coordinator 节点会根据 ExchangeNode 类型切分 Plan,并生成每个 Stage 执行计划片段。
接着,Coordinator 节点会调用 SegmentScheduler 调度器,将各 Stage 的 PlanSegment 发送给 Worker 节点。当 Worker 接收到 PlanSegment 后,InterpreterPlanSegment 会完成数据的读取和执行,通过 ExchangeManager 完成数据的交互。最后,Coordinator 从最后一轮 Stage 所对应的 ExchangeManager 中去读取数据,并返回给 Client。
查询片段调度器 SegmentScheduler 负责调度查询不同的 PlanSegment,根据上下游依赖关系和数据分布,以及 Stage 并行度和 worker 分布和状态信息,按照一定的调度策略,将 PlanSemgent 发给不同的 Worker 节点。
目前而言,我们在进行计划下发和调度时,主要实现了两种策略。
第一种是依赖调度,根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,并根据 DAG 图调度 Stage。依赖调度要等到依赖 Stage 启动以后,才会调度对应的 Stage。例如两表 Join,会先调度左右表读取 Stage,之后再调度 Join 这个 Stage,因为 Join 的 Stage 依赖于左右表的 Stage。
第二种是 AllAtOnce 策略,先计算每个 Stage 的相关信息,后一次性调度所有 Stage。
相比而言,这两种策略是在容错、资源使用和延时上去做取舍。第一种策略依赖调度,可以实现更好的容错。由于 ClickHouse 数据可以有多个副本,读数据时,如部分节点连接失败,可以尝试它的副本节点。对后续依赖的节点的 Stage 来说,并不需要感知到前面 Stage 的执行情况。非 Source Stage,本身没有对数据的依赖,所以容错能力会更强,只要保证 Stage 并行度的节点存活即可。甚至极端情况下,如需保证 Query 正常执行,也可以降低 Stage 的并行度。但调度存在依赖关系,并不能完全并行,会增加调度的时长。Stage 较多的情况下,调度延时可能会占据 SQL 整体不小的比例。针对上述问题的可做如下优化:对于一些没有依赖关系的,尽可能支持并行。例如同一个 Stage 的不同节点,可以并行。没有依赖关系的 Stage,也可以并行。
第二种调度策略是 AllAtOnce,通过并行可以极大降低调度延时。为防止出现大量网络 IO 线程,可以通过异步化手段控制线程数目。AllAtOnce 策略的缺点是容错性没有依赖调度好,每一个 Stage 的 Worker 在调度前就已经确定了,调度过程中有一个 Worker 出现连接异常,则整个 Query 都会失败。另一类情况,Stage 在上游数据还没有 ready,就被调度起来了,则需要较长时间等数据。例如 Final 的 agg Stage,要等 Partial agg 完成以后才能够拿到对应的数据。虽然我们也对此进行了一些优化,并不会长时间空跑,浪费 CPU 资源。但是其实也消耗了一部分资源,例如需要去创建这些执行的线程。
ClickHouse 的查询节点执行主要是以 SQL 形式在节点间互相交互。在切分 Stage 后,我们需要支持能够执行一个单独的 PlanSegment 的执行计划。因此,InterpreterPlanSegment 主要的作用就是接受一个序列化后的 PlanSegment,能够在 Worker 节点上去运行整个 PlanSegment 的逻辑。此外,我们也进行了功能和性能上的增强,例如支持一个 Stage 处理多个 Join,这样便可以减少 Stage 的数目和一些不必要的传输,用一个 Stage 就可以完成整个 Join 的过程。InterpreterPlanSegment 的执行会上报对应的状态信息,如出现执行异常,会将异常信息报告给查询片段调度器,调度器会取消 Query 其他的 Stage 的 Worker 执行。
ExchangeManager 是 PlanSegment 数据交换的媒介,能平衡数据上下游处理的能力。整体而言,我们的设计采用 Push 与队列的方式,当上游的数据 ready 时,主动推送给下游,并在这个基础上支持了反压的能力。
在整个流程中,上下游都会通过队列来优化发送和读取,上游与下游会有一个自己的队列。当队列饱和的时候,会通过类似反压的机制来控制上游这个执行速度,若上游计算快,下游处理能力比较慢,出现下游处理不过来的情况,则会通过反压的方式来控制上游执行的速度。
由于采用 push 和队列,因此要考虑一个相对比较特殊的场景,在某些 case 的情况下,下游的 Stage 并不需要读取全部的上游的数据。例如 Limit100,下游只需读取 100 条数据,而上游可能会产生非常大规模的数据。因此在这种情况下,当下游的 Stage 读取到足够的数据后,它需要能够主动取消上游 Stage 的执行,并且清空队列。
ExchangeManager 考虑的优化点较多,例如细粒度的内存控制,能够按照实例、Query、Segment 等多个层次进行内存控制,避免 OOM。更长期的考虑是在一些对延迟要求不高、数据量大的场景,通过将数据 Spill 到磁盘,降低内存的使用
第二,为了提升传输效率,小数据要做 Merge,大数据要做 Split。同时,在网络传输和处理某些场景的时候,需要做一种有序性的保证。例如在 Sort 的场景,Partial Sort 和 Merge Sort 的网络传输过程必须要保证是有序的,传输数据不能出现乱序的情况,否则进行 Merge Sort 时数据就会出问题,并影响最终结果。
第三,连接的复用和网络的优化,包括上下游在同一个节点,尽可能走内存交换,而不走网络。这样可以减少网络开销以及数据的序列化和反序列化的代价。此外,ClickHouse 在计算上做了非常充足的优化,因此其在某些场景中,内存带宽会成为瓶颈,在 ExchangeManager 的一些场景中,可以用一些零拷贝和其他优化,尽量减少内存的拷贝。
第四,异常处理和监控。相比于单机,分布式情况下异常情况会更加复杂,且更加难以感知。通过重试能够避免一些节点短时性的高负载或者异常对查询的影响。做好监控,在出问题的时候,能快速感知,并进行排查,也能够针对性地去做优化。
优化与诊断
首先是 Join 的多种实现和优化。根据数据的规模和分布,可以根据不同的场景去选择合适的 Join 的实现方式:
Shuffle Join,是目前使用方式最多,也是最常见的。
Broadcast Join,大表 Join 小表场景,将右表广播到左表的所有 Worker 节点上面,这样可以避免左表大表的数据传输。
Colocate Join,如果左右表都已按照 Join key 分布,并且它们是相通的分布的话,其实不需要去做数据的 exchange,可以将数据的传输减到最小。
网络连接的优化,核心本质是减少连接的建立和使用,特别是在数据需要 Shuffle 时,下一轮 Stage 中的每一个节点都要从上游的 Stage 中的每个节点去拉取数据。若集群整体的节点数较多,且存在很多较复杂的 Query,就会建立非常多的连接。
目前在字节内部,ClickHouse 集群的规模非常大,在当前 ClickHouse 二阶段执行的高并发情况下,单机最大可能会建立几万个连接。因此必须要进行网络连接的优化,特别是支持连接的复用,每个连接上可以跑多个 Stage 查询。通过尽可能去复用连接,在不同的节点之间,能够建立固定数目的连接,不同的 Query、Stage 都会复用这些连接,连接数并不会随着 Query 和 Stage 的规模的增长而增长。
网络传输优化,在数据中心内,远程的直接的内存访问,通常指 RDMA,是一种能够超过远程主机操作系统的内核,去访问内存里的数据的技术。由于这种技术不需要经过操作系统,所以不仅节省了大量的 CPU 资源,同样也提升了系统吞吐量,降低了系统的网络通信延迟,尤其适合大规模并行的计算机集群。由于 ClickHouse 在计算层面做了很多优化,而网络带宽相比于内存带宽要小不少,在一些数据量传输特别大的场景,网络传输会成为一定的瓶颈。为了提升网络传输的效率和提升数据 exchange 的吞吐,一方面可以引入压缩来降低传输数据量,另一方面可以引入 RDMA 来减少一定的开销。经过测试,在一些数据传输量大的场景,有不小的收益。
利用 Runtime Filter 的优化在不少数据库也有使用。Join 的算子通常是 OLAP 引擎里最耗时的算子,优化 Join 算子有两种思路。一种思路是可以提升 Join 算子的性能。比如对于 HashJoin,可以优化 HashTable 实现,也可以实现更好的哈希算法,包括做一些更好的并行的方式。
另一种思路是,如果本身算子耗时比较重,可以减少参与算子计算的数据。Runtime Filter 是在一些场景下特别是事实表 Join 多张维度表的星型模型场景有比较好的效果。在此类场景下,通常事实表的规模会非常大,而大部分的过滤条件都是在维度表上面。
Runtime Filter 的作用,是通过在 Join 的 Probe 端,提前过滤掉并不会命中 Join 条件的输入数据,从而大幅减少 Join 中的数据传输和计算。通过这种方式,能够减少整体的执行时间。因此我们在复杂查询上也支持了 Runtime Filter,目前主要支持 Min Max 和 Bloom Filter。
如果 runtime filter 的列(join column)构建了索引(主键、skip index…),是需要重新生成 pipeline 的。因为命中索引后,可能会减少数据的读取,pipeline 并行度和对应数据的处理 range 都可能发生变化。如果 runtime filter 的列跟索引无关,可以在计划生成的时候预先带上过滤条件,一开始为空,只是占位,runtime filter 下发的时候把占位信息改成真正的过滤条件即可。这样即使 runtime filter 下发超时了,查询片段已经开始执行,只要查询片段没有执行完,之后的数据仍然可以进行过滤。
但需要注意的是,Runtime Filter 是一种特殊场景下的优化,针对场景是右表数据量不大,并且构建的 Runtime Filter 对左表有比较好的过滤效果。若右表数据量较大,构建的 Runtime Filter 的时间比较久,或对左表的数据过滤没有效果。Runtime Filter 反而会增加查询的耗时和计算的开销。因此要根据数据的特征和规模来决定是否开启优化。
性能诊断和分析对复杂查询很关键,由于引入了复杂查询的多 Stage 模型,SQL 执行的模式会变得复杂。对此的优化首先是尽可能完善各类 Metrics,包括 Query 执行时间、不同 Stage 执行时间、起始时间、结束时间、处理的 IO 数据量、算子处理的数据、执行情况,以及各类的算子 Metrics 和一些 Profile Events(例如 Runtime Filter 会有构建时间、过滤数据量等 Metrics)。
其次,我们记录了反压信息与上下游的队列长度,以此推断 Stage 的执行情况和瓶颈。
通常可以有如下判断:
输入和输出队列数目同为低或同为高分别表明当前 stage 处理正常或处于被下游反压,此时可以通过反压信息来进一步判断
当输入和输出队列数目不一样,这可能是出于反压传导的中间状态或者该 stage 就是反压的根源
如果一个 stage 的输出队列数目很多,且经常被反压,通常是被下游 stage 所影响,所以可以排除它本身是反压根源的可能性,更多关注它的下游
如果一个 stage 的输出队列数目很少,但其输入队列的数目很高,则表明它有可能是反压的根源。优化目标是提升这个 stage 的处理能力。
总的来说,SQL 的场景包罗万象,非常复杂的场景有时还是需要对引擎有一定了解的同学去诊断和分析,给出优化建议。字节目前也在不断完善这些经验,希望能够通过不断完善 Metrics 和分析的路径,持续减轻 Oncall 的负担,在某些场景下能够更加准确地给出优化建议。
效果与展望
根据上述所提,目前执行模型存在三个缺点,我们进行了复杂查询的优化,因此需要验证这种新的模式是否能够解决发现的问题,测试场景如下:
第二阶段计算较复杂,且第一阶段数据较多
Hash Join 右表是大表
多表 Join,模拟复杂 Query
以 SSB 1T 数据作为数据集,环境则是构建了 8 个节点的集群。
Case1——二阶段计算复杂。我们看到有一个比较重的计算算子 UniqExact,就是 count distinct 的计算方式,通过 Hash 表做去重。count distinct 默认采用这种算法,当我们使用复杂查询后,Query 的执行时间从 8.5 秒减少到 2.198 秒。第二阶段 agg uniqExact 算子的合并原本由 coordinator 单点合并,现在通过按照 group by key shuffle 后可以由多个节点并行完成。因此通过 shuffle 减轻了 coordinator 的 merge agg 压力。
Case2——右表为大表。由于 ClickHouse 对多表的优化做的还不是很到位。这里采用子查询来下推过滤的条件。在这个 case 中,Lineorder 是一张大表,采用复杂查询的模式以后,Query 执行时间从 17 秒优化到了 1.7 秒。由于 Lineorder 是一张大表,通过 Shuffle 可以将数据按照 Join key Shuffle 到各 Worker 节点上,这样就减少了右表构建的压力。
Case3——多表 Join。开启复杂查询后,Query 的执行时间从 8.58 秒优化到 4.464 秒,所有的右表都可以同时开始数据的处理和构建。为了和现有模式做对比,复杂查询这里并没有开启 runtime filter,开启 runtime filter 后效果会更好。
事实上,优化器对复杂查询的性能提升也非常大,通过一些 RBO 的规则,例如常见的谓词下推、相关子查询的处理等,可以极大提升 SQL 的执行效率。在复杂查询的模式下,由于有优化器的存在,用户甚至不需要写得非常复杂,优化器自动去完成这些下推和 RBO 规则优化。
此外,选择用哪一种 Join 的实现,也会对 Join 的性能影响较大。若能够满足 Join Key 分布,使用 Colocate Join 可以减少左右表 Shuffle 的传输代价。在多表 Join 的情况下,Join 的顺序和 Join 的实现方式对执行的时长影响,会比两表 Join 更大。借助这种数据的统计信息,通过一些 CBO 的优化,可以得到一个比较好的执行模式。
有了优化器,业务同学可以按照业务逻辑来写任何的 SQL,引擎自动计算出相对最优的 SQL 计划并执行,加速查询的执行。
总结一下,ClickHouse 目前的执行模式在很多单表的场景下表现非常优异,我们主要针对复杂场景做优化,通过实现多 Stage 的模式,实现了 Stage 之间的数据的传输,从工程实践上做了较多尝试和优化,去提升执行和网络传输的性能。并希望通过完善 Metrics 和智能诊断来降低 SQL 分析和调优的门槛。目前已经实现了第一步,未来字节仍有很多努力的方向。
首先,是要继续去提升执行和 Exchange 的性能。这里不谈论引擎执行通用的优化,比如更好的索引或者算子的优化,主要是跟复杂查询模式有关。举一个例子,比如 Stage 复用,在 SQL 出现子查询结果被反复使用的场景,比如一些多表 join 和 CTE 场景可能有帮助。通过 Stage 复用可以减少相同数据的多次读取。Stage 复用我们之前就已经支持,但是用的场景比较少,未来准备更灵活和通用。其次,Metrics 和智能诊断加强。SQL 的灵活度很高,因此一些复杂查询如果没有 Metrics 其实几乎很难去做诊断和调优。以上都是字节跳动数据平台在未来会长期的持续去发力的方向。
立即跳转火山引擎ByteHouse官网了解详情!
评论