写点什么

ByConity ELT 测试——体验 BSP 模式带来的高效数据处理

作者:颜颜yan_
  • 2024-12-29
    湖北
  • 本文字数:10346 字

    阅读完需:约 34 分钟

前言

在实际业务场景中,实时数仓和离线数仓的构建对于满足用户多样化的数据分析需求至关重要。实时数仓注重数据的快速入库与即时分析,而离线数仓则强调复杂任务的稳定执行与高效的内存管理。ByConity 作为一款开源云原生数据仓库,不仅支持多种数据分析场景,还引入了BSP(Batch Service Processing)模式,旨在通过更细粒度的调度和资源感知的调度策略,将数据加工过程无缝集成到 ByConity 内部,实现一站式数据接入、加工和分析。



测试环境


测试步骤

登录 ESC

MacOS / Linux

MacOS / Linux 可以通过 Shell(终端)应用来完成 SSH 连接远程服务器。


打开终端,输入ssh -p 23 <用户名>@<ECS服务器IP地址> ,并回车确认。

如果系统提示你输入 yes 或者 no 来确认是否连接,输入yes并回车。

然后输入<登录密码>并回车。

为避免使用时超时自动断开连接,运行tmux new -s $user_id命令创建一个新的 tmux 会话,其中$user_id是可以自定义的会话名称。(后续重新登录时,使用 tmux a -t $user_id)。例如:


tmux new -s yan
复制代码


执行 clickhouse client --port 9010命令进入客户端。如果后续输入 SQL 会被截断,在此处可以执行clickhouse client --port 9010 -mn,此后 SQL 后需要加;作为结束。


Windows

Windows 系统在本地主机打开开始,打开命令提示符终端,输入ssh -p 23 <用户名>@<ECS服务器IP地址> ,并回车确认。如果系统提示你输入 yes 或者 no 来确认是否连接,输入yes并回车。然后输入<登录密码>并回车。

如果连接失败,可以使用开源软件 PuTTY 进行连接操作。

执行查询

连接数据库

使用测试用数据库 test_elt


use test_elt;
复制代码



接下来看看库表

show tables;
复制代码



设置方言类型

由于 TPC-DS 定义的查询语法为标准 SQL,设置数据库会话的方言类型为 ANSI


 set dialect_type = 'ANSI'
复制代码



查看数据量


查询操作

选择 q78 进行查询操作,代码如下。


with ws as        (select d_year AS ws_sold_year, ws_item_sk,        ws_bill_customer_sk ws_customer_sk,        sum(ws_quantity) ws_qty,        sum(ws_wholesale_cost) ws_wc,        sum(ws_sales_price) ws_sp        from web_sales        left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk        join date_dim on ws_sold_date_sk = d_date_sk        where wr_order_number is null        group by d_year, ws_item_sk, ws_bill_customer_sk        ),        cs as        (select d_year AS cs_sold_year, cs_item_sk,        cs_bill_customer_sk cs_customer_sk,        sum(cs_quantity) cs_qty,        sum(cs_wholesale_cost) cs_wc,        sum(cs_sales_price) cs_sp        from catalog_sales        left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk        join date_dim on cs_sold_date_sk = d_date_sk        where cr_order_number is null        group by d_year, cs_item_sk, cs_bill_customer_sk        ),        ss as        (select d_year AS ss_sold_year, ss_item_sk,        ss_customer_sk,        sum(ss_quantity) ss_qty,        sum(ss_wholesale_cost) ss_wc,        sum(ss_sales_price) ss_sp        from store_sales        left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk        join date_dim on ss_sold_date_sk = d_date_sk        where sr_ticket_number is null        group by d_year, ss_item_sk, ss_customer_sk        )        select        ss_sold_year, ss_item_sk, ss_customer_sk,        round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,        ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,        coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,        coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,        coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price        from ss        left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)        left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)        where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2000        order by        ss_sold_year, ss_item_sk, ss_customer_sk,        ss_qty desc, ss_wc desc, ss_sp desc,        other_chan_qty,        other_chan_wholesale_cost,        other_chan_sales_price,        ratio        LIMIT 100;
复制代码



Progress: 219.15 thousand rows, 577.28 KB (838.64 thousand rows/s., 2.21 MB/s.) 98%0 rows in set. Elapsed: 29.363 sec. Processed 219.15 thousand rows, 577.28 KB (7.46 thousand rows/s., 19.66 KB/s.)Received exception from server (version 21.8.7):Code: 241. DB::Exception: Received from localhost:9010. DB::Exception: Code: 241, e.displayText() = DB::Exception: Worker host:10.0.0.14:8124, exception:Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 60.79 GiB (attempt to allocate chunk of 0 bytes), maximum: 56.51 GiB: While executing AggregatingTransform SQLSTATE: 53000 (version 21.8.7.1) SQLSTATE: 53000 (version 21.8.7.1) SQLSTATE: 53000.


这段返回值表明查询执行失败了,主要原因是内存超限。


Progress: 219.15 thousand rows, 577.28 KB (838.64 thousand rows/s., 2.21 MB/s.) 98%


这行显示查询进度:已处理 219.15 千行数据,数据量为 577.28 KB,处理速度为每秒 838.64 千行,已完成 98%


Memory limit (total) exceeded: would use 60.79 GiB (attempt to allocate chunk of 0 bytes), maximum: 56.51 GiB


这行是错误信息的关键部分,表明查询需要使用 60.79 GB 内存,但系统限制最大只能使用 56.51 GB,因此内存不足导致查询失败。

distributed_max_parallel_size

查询失败后,在失败的 SQL 最后加上设置后再次执行:


SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码


其中参数distributed_max_parallel_size可以设置为 4 的其他整数倍(因为 Worker 的数量为 4)。添加参数后执行成功


with ws as        (select d_year AS ws_sold_year, ws_item_sk,        ws_bill_customer_sk ws_customer_sk,        sum(ws_quantity) ws_qty,        sum(ws_wholesale_cost) ws_wc,        sum(ws_sales_price) ws_sp        from web_sales        left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk        join date_dim on ws_sold_date_sk = d_date_sk        where wr_order_number is null        group by d_year, ws_item_sk, ws_bill_customer_sk        ),        cs as        (select d_year AS cs_sold_year, cs_item_sk,        cs_bill_customer_sk cs_customer_sk,        sum(cs_quantity) cs_qty,        sum(cs_wholesale_cost) cs_wc,        sum(cs_sales_price) cs_sp        from catalog_sales        left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk        join date_dim on cs_sold_date_sk = d_date_sk        where cr_order_number is null        group by d_year, cs_item_sk, cs_bill_customer_sk        ),        ss as        (select d_year AS ss_sold_year, ss_item_sk,        ss_customer_sk,        sum(ss_quantity) ss_qty,        sum(ss_wholesale_cost) ss_wc,        sum(ss_sales_price) ss_sp        from store_sales        left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk        join date_dim on ss_sold_date_sk = d_date_sk        where sr_ticket_number is null        group by d_year, ss_item_sk, ss_customer_sk        )        select        ss_sold_year, ss_item_sk, ss_customer_sk,        round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,        ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,        coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,        coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,        coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price        from ss        left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)        left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)        where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2000        order by        ss_sold_year, ss_item_sk, ss_customer_sk,        ss_qty desc, ss_wc desc, ss_sp desc,        other_chan_qty,        other_chan_wholesale_cost,        other_chan_sales_price,        ratio        LIMIT 100        SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码



触发 OOM

OOM (Out Of Memory) 是当进程申请的虚拟内存空间超过系统限制或物理内存+交换空间的总量时触发的错误。在 Linux 系统中,OOM Killer会被触发来终止某些进程以释放内存。


==触发 OOM 的常见原因:==

  1. 查询处理的数据量太大

  2. 内存限制设置太小

  3. 复杂的计算或排序操作

  4. 多表关联产生大量中间结果


查询 q64,代码如下:


with cs_ui as        (select cs_item_sk        ,sum(cs_ext_list_price) as sale,sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit) as refund        from catalog_sales        ,catalog_returns        where cs_item_sk = cr_item_sk        and cs_order_number = cr_order_number        group by cs_item_sk        having sum(cs_ext_list_price)>2*sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit)),        cross_sales as        (select i_product_name product_name        ,i_item_sk item_sk        ,s_store_name store_name        ,s_zip store_zip        ,ad1.ca_street_number b_street_number        ,ad1.ca_street_name b_street_name        ,ad1.ca_city b_city        ,ad1.ca_zip b_zip        ,ad2.ca_street_number c_street_number        ,ad2.ca_street_name c_street_name        ,ad2.ca_city c_city        ,ad2.ca_zip c_zip        ,d1.d_year as syear        ,d2.d_year as fsyear        ,d3.d_year s2year        ,count(*) cnt        ,sum(ss_wholesale_cost) s1        ,sum(ss_list_price) s2        ,sum(ss_coupon_amt) s3        FROM   store_sales        ,store_returns        ,cs_ui        ,date_dim d1        ,date_dim d2        ,date_dim d3        ,store        ,customer        ,customer_demographics cd1        ,customer_demographics cd2        ,promotion        ,household_demographics hd1        ,household_demographics hd2        ,customer_address ad1        ,customer_address ad2        ,income_band ib1        ,income_band ib2        ,item        WHERE  ss_store_sk = s_store_sk AND        ss_sold_date_sk = d1.d_date_sk AND        ss_customer_sk = c_customer_sk AND        ss_cdemo_sk= cd1.cd_demo_sk AND        ss_hdemo_sk = hd1.hd_demo_sk AND        ss_addr_sk = ad1.ca_address_sk and        ss_item_sk = i_item_sk and        ss_item_sk = sr_item_sk and        ss_ticket_number = sr_ticket_number and        ss_item_sk = cs_ui.cs_item_sk and        c_current_cdemo_sk = cd2.cd_demo_sk AND        c_current_hdemo_sk = hd2.hd_demo_sk AND        c_current_addr_sk = ad2.ca_address_sk and        c_first_sales_date_sk = d2.d_date_sk and        c_first_shipto_date_sk = d3.d_date_sk and        ss_promo_sk = p_promo_sk and        hd1.hd_income_band_sk = ib1.ib_income_band_sk and        hd2.hd_income_band_sk = ib2.ib_income_band_sk and        cd1.cd_marital_status <> cd2.cd_marital_status and        i_color in ('purple','burlywood','indian','spring','floral','medium') and        i_current_price between 64 and 64 + 10 and        i_current_price between 64 + 1 and 64 + 15        group by i_product_name        ,i_item_sk        ,s_store_name        ,s_zip        ,ad1.ca_street_number        ,ad1.ca_street_name        ,ad1.ca_city        ,ad1.ca_zip        ,ad2.ca_street_number        ,ad2.ca_street_name        ,ad2.ca_city        ,ad2.ca_zip        ,d1.d_year        ,d2.d_year        ,d3.d_year        )        select cs1.product_name        ,cs1.store_name        ,cs1.store_zip        ,cs1.b_street_number        ,cs1.b_street_name        ,cs1.b_city        ,cs1.b_zip        ,cs1.c_street_number        ,cs1.c_street_name        ,cs1.c_city        ,cs1.c_zip        ,cs1.syear        ,cs1.cnt        ,cs1.s1 as s11        ,cs1.s2 as s21        ,cs1.s3 as s31        ,cs2.s1 as s12        ,cs2.s2 as s22        ,cs2.s3 as s32        ,cs2.syear        ,cs2.cnt        from cross_sales cs1,cross_sales cs2        where cs1.item_sk=cs2.item_sk and        cs1.syear = 1999 and        cs2.syear = 1999 + 1 and        cs2.cnt <= cs1.cnt and        cs1.store_name = cs2.store_name and        cs1.store_zip = cs2.store_zip        order by cs1.product_name        ,cs1.store_name        ,cs2.cnt        ,cs1.s1        ,cs2.s1;
复制代码


查询成功。



在执行成功的查询中,添加参数限制查询的最大内存使用量,如:


SETTINGSmax_memory_usage=40000000000;
复制代码


单位为 B,当前约合 37.25 GB。将内存限制为合适的值,引发 oom。随后执行distributed_max_parallel_size,完成查询。内存不宜限制的过小,可以先用 40000000000 做第一次尝试,如果依然顺利执行,可依次将内存调整为上一次的 70%。


with cs_ui as        (select cs_item_sk        ,sum(cs_ext_list_price) as sale,sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit) as refund        from catalog_sales        ,catalog_returns        where cs_item_sk = cr_item_sk        and cs_order_number = cr_order_number        group by cs_item_sk        having sum(cs_ext_list_price)>2*sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit)),        cross_sales as        (select i_product_name product_name        ,i_item_sk item_sk        ,s_store_name store_name        ,s_zip store_zip        ,ad1.ca_street_number b_street_number        ,ad1.ca_street_name b_street_name        ,ad1.ca_city b_city        ,ad1.ca_zip b_zip        ,ad2.ca_street_number c_street_number        ,ad2.ca_street_name c_street_name        ,ad2.ca_city c_city        ,ad2.ca_zip c_zip        ,d1.d_year as syear        ,d2.d_year as fsyear        ,d3.d_year s2year        ,count(*) cnt        ,sum(ss_wholesale_cost) s1        ,sum(ss_list_price) s2        ,sum(ss_coupon_amt) s3        FROM   store_sales        ,store_returns        ,cs_ui        ,date_dim d1        ,date_dim d2        ,date_dim d3        ,store        ,customer        ,customer_demographics cd1        ,customer_demographics cd2        ,promotion        ,household_demographics hd1        ,household_demographics hd2        ,customer_address ad1        ,customer_address ad2        ,income_band ib1        ,income_band ib2        ,item        WHERE  ss_store_sk = s_store_sk AND        ss_sold_date_sk = d1.d_date_sk AND        ss_customer_sk = c_customer_sk AND        ss_cdemo_sk= cd1.cd_demo_sk AND        ss_hdemo_sk = hd1.hd_demo_sk AND        ss_addr_sk = ad1.ca_address_sk and        ss_item_sk = i_item_sk and        ss_item_sk = sr_item_sk and        ss_ticket_number = sr_ticket_number and        ss_item_sk = cs_ui.cs_item_sk and        c_current_cdemo_sk = cd2.cd_demo_sk AND        c_current_hdemo_sk = hd2.hd_demo_sk AND        c_current_addr_sk = ad2.ca_address_sk and        c_first_sales_date_sk = d2.d_date_sk and        c_first_shipto_date_sk = d3.d_date_sk and        ss_promo_sk = p_promo_sk and        hd1.hd_income_band_sk = ib1.ib_income_band_sk and        hd2.hd_income_band_sk = ib2.ib_income_band_sk and        cd1.cd_marital_status <> cd2.cd_marital_status and        i_color in ('purple','burlywood','indian','spring','floral','medium') and        i_current_price between 64 and 64 + 10 and        i_current_price between 64 + 1 and 64 + 15        group by i_product_name        ,i_item_sk        ,s_store_name        ,s_zip        ,ad1.ca_street_number        ,ad1.ca_street_name        ,ad1.ca_city        ,ad1.ca_zip        ,ad2.ca_street_number        ,ad2.ca_street_name        ,ad2.ca_city        ,ad2.ca_zip        ,d1.d_year        ,d2.d_year        ,d3.d_year        )        select cs1.product_name        ,cs1.store_name        ,cs1.store_zip        ,cs1.b_street_number        ,cs1.b_street_name        ,cs1.b_city        ,cs1.b_zip        ,cs1.c_street_number        ,cs1.c_street_name        ,cs1.c_city        ,cs1.c_zip        ,cs1.syear        ,cs1.cnt        ,cs1.s1 as s11        ,cs1.s2 as s21        ,cs1.s3 as s31        ,cs2.s1 as s12        ,cs2.s2 as s22        ,cs2.s3 as s32        ,cs2.syear        ,cs2.cnt        from cross_sales cs1,cross_sales cs2        where cs1.item_sk=cs2.item_sk and        cs1.syear = 1999 and        cs2.syear = 1999 + 1 and        cs2.cnt <= cs1.cnt and        cs1.store_name = cs2.store_name and        cs1.store_zip = cs2.store_zip        order by cs1.product_name        ,cs1.store_name        ,cs2.cnt        ,cs1.s1        ,cs2.s1        LIMIT 100        SETTINGSmax_memory_usage=100000000;
复制代码


如下,成功触发 OOM。


在代码后添加 SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;

with cs_ui as        (select cs_item_sk        ,sum(cs_ext_list_price) as sale,sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit) as refund        from catalog_sales        ,catalog_returns        where cs_item_sk = cr_item_sk        and cs_order_number = cr_order_number        group by cs_item_sk        having sum(cs_ext_list_price)>2*sum(cr_refunded_cash+cr_reversed_charge+cr_store_credit)),        cross_sales as        (select i_product_name product_name        ,i_item_sk item_sk        ,s_store_name store_name        ,s_zip store_zip        ,ad1.ca_street_number b_street_number        ,ad1.ca_street_name b_street_name        ,ad1.ca_city b_city        ,ad1.ca_zip b_zip        ,ad2.ca_street_number c_street_number        ,ad2.ca_street_name c_street_name        ,ad2.ca_city c_city        ,ad2.ca_zip c_zip        ,d1.d_year as syear        ,d2.d_year as fsyear        ,d3.d_year s2year        ,count(*) cnt        ,sum(ss_wholesale_cost) s1        ,sum(ss_list_price) s2        ,sum(ss_coupon_amt) s3        FROM   store_sales        ,store_returns        ,cs_ui        ,date_dim d1        ,date_dim d2        ,date_dim d3        ,store        ,customer        ,customer_demographics cd1        ,customer_demographics cd2        ,promotion        ,household_demographics hd1        ,household_demographics hd2        ,customer_address ad1        ,customer_address ad2        ,income_band ib1        ,income_band ib2        ,item        WHERE  ss_store_sk = s_store_sk AND        ss_sold_date_sk = d1.d_date_sk AND        ss_customer_sk = c_customer_sk AND        ss_cdemo_sk= cd1.cd_demo_sk AND        ss_hdemo_sk = hd1.hd_demo_sk AND        ss_addr_sk = ad1.ca_address_sk and        ss_item_sk = i_item_sk and        ss_item_sk = sr_item_sk and        ss_ticket_number = sr_ticket_number and        ss_item_sk = cs_ui.cs_item_sk and        c_current_cdemo_sk = cd2.cd_demo_sk AND        c_current_hdemo_sk = hd2.hd_demo_sk AND        c_current_addr_sk = ad2.ca_address_sk and        c_first_sales_date_sk = d2.d_date_sk and        c_first_shipto_date_sk = d3.d_date_sk and        ss_promo_sk = p_promo_sk and        hd1.hd_income_band_sk = ib1.ib_income_band_sk and        hd2.hd_income_band_sk = ib2.ib_income_band_sk and        cd1.cd_marital_status <> cd2.cd_marital_status and        i_color in ('purple','burlywood','indian','spring','floral','medium') and        i_current_price between 64 and 64 + 10 and        i_current_price between 64 + 1 and 64 + 15        group by i_product_name        ,i_item_sk        ,s_store_name        ,s_zip        ,ad1.ca_street_number        ,ad1.ca_street_name        ,ad1.ca_city        ,ad1.ca_zip        ,ad2.ca_street_number        ,ad2.ca_street_name        ,ad2.ca_city        ,ad2.ca_zip        ,d1.d_year        ,d2.d_year        ,d3.d_year        )        select cs1.product_name        ,cs1.store_name        ,cs1.store_zip        ,cs1.b_street_number        ,cs1.b_street_name        ,cs1.b_city        ,cs1.b_zip        ,cs1.c_street_number        ,cs1.c_street_name        ,cs1.c_city        ,cs1.c_zip        ,cs1.syear        ,cs1.cnt        ,cs1.s1 as s11        ,cs1.s2 as s21        ,cs1.s3 as s31        ,cs2.s1 as s12        ,cs2.s2 as s22        ,cs2.s3 as s32        ,cs2.syear        ,cs2.cnt        from cross_sales cs1,cross_sales cs2        where cs1.item_sk=cs2.item_sk and        cs1.syear = 1999 and        cs2.syear = 1999 + 1 and        cs2.cnt <= cs1.cnt and        cs1.store_name = cs2.store_name and        cs1.store_zip = cs2.store_zip        order by cs1.product_name        ,cs1.store_name        ,cs2.cnt        ,cs1.s1        ,cs2.s1        LIMIT 100        SETTINGSmax_memory_usage=100000000        SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码




测试反馈

ByConity 引入的 BSP(Bulk Synchronous Parallel)模式是一项关键的功能升级,distributed_max_parallel_size 参数负责调控分布式查询中表扫描的并行级别。用户可根据集群资源状况和查询的具体需求,灵活调整此参数以优化查询性能。max_memory_usage 参数则用于设定单个查询在执行期间所能使用的最大内存量。通过合理配置该参数,能有效防止单个查询过度占用内存资源,确保系统稳定性,避免对其他查询造成干扰。通过合理调整distributed_max_parallel_sizemax_memory_usage的值,用户可以在保证查询性能的同时,避免资源过度消耗和查询失败的风险,从而实现资源的优化配置和系统的稳定运行。

在实际操作中,想要找到一个既能充分利用资源,又能避免 OOM 并维持 BSP 模式稳定运行的参数配置并不容易。建议系统能够具备自适应的能力,根据查询的资源需求和集群的当前状态,智能地推荐 BSP 模式的开启与否,并给出合理的并行度设置建议。

发布于: 刚刚阅读数: 7
用户头像

颜颜yan_

关注

还未添加个人签名 2022-10-01 加入

还未添加个人简介

评论

发布
暂无评论
ByConity ELT测试——体验BSP模式带来的高效数据处理_OOM_颜颜yan__InfoQ写作社区