业务背景
大家都知道,在现在这个数据量飞速增长的数据为王时代,任何一家企业只有对自身的企业数据有精准的分析和研判,那么才能掌握数据密码,从而掌握这个时代盈利的秘诀。因此,高效的数据处理和分析能力已经成为企业竞争力的关键。
在实际业务中,用户会基于不同的产品分别构建实时数仓和离线数仓。其中,实时数仓强调数据能够快速入库,且在入库的第一时间就可以进行分析,低时延的返回分析结果。而离线数仓强调复杂任务能够稳定的执行完,需要更好的内存管理。
什么是 ByConity
我们今天的主角是 ByConity,那么什么是 ByConity?
ByConity 是字节跳动开源的云原生数据仓库,可以满足用户的多种数据分析场景。并且,ByConity 增加了 bsp 模式:可以进行 task 级别的容错;更细粒度的调度;基于资源感知的调度。希望通过 bsp 能力,把数据加工(T)的过程转移到 ByConity 内部,能够一站式完成数据接入、加工和分析。
上手实测
在对当前数字驱动时代下的企业处境有了一个基础的了解后,我们又了解了开源的云原生数据仓库 ByConity。下面就正式开始今天的上手实测吧。
为了实际上手感受 bsp 模式带来的效果,官方为我们提供了使用 TPC-DS 1TB 的数据测试活动,让我们可以亲自验证 ByConity 在 ELT 方面的实际感受。
环境要求
由于要测试的是大数据量的数据加工和分析,因此对于环境的要求也相对比较高,官方为我们提供了一个测试集群,下面是集群规格
测试操作
远程登录 ECS 服务器
开始操作之前首先需要登录 ECS 服务器,我们可以通过本地 SSH 工具连接到 ECS 服务器,打开本地远程连接工具 Xshell,点击【新建会话】,输入 ECS 服务器 IP 地址,选择端口 23
继续点击【用户身份验证】选择密码登录方式,输入用户名、密码,点击【确定】
这里需要说明一下,我用我本地自带的 Xshell 5 远程连接服务器工具没有成功,提示如下【找不到匹配的 host key 算法。】,我理解应该是我本地的连接远程工具太老了,很久没更新导致的。那么下面我采用第二种链接方法,采用 windows10 自带的远程连接工具。
windows10 自带连接工具
这里在本地磁盘新建一个文件夹,在文件夹内点击 ctrl+shift+鼠标右键 选中【在此处打开 Powershell 窗口(S)】打开自带的命令行工具
在命令行工具输入以下命令,并回车确认
ssh -p 23 <提供的用户名>@<ECS服务器IP地址>
# ssh -p 23 root@14.103.145.182
复制代码
这时会出现一句提示,我们输入 yes 点击回车
Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
复制代码
随后会提示让我们输入密码,我们输入密码后点击回车即可登录 ECS 服务器。这里需要注意的是密码一定要手输入,粘贴的密码这里不能识别
为了避免连接超时导致断开连接,频繁输入密码耽误时间,我们可以执行命令
tmux new -s $user_id
# tmux new -s $user_2024
复制代码
创建一个新的 tmux 会话,其中$user_id
是可以自定义的会话名称。(后续重新登录时,使用 tmux a -t $user_id
)。
继续输入命令执行,进入客户端
clickhouse client --port 9010
复制代码
执行查询
首先切换到测试用数据库 test_elt
TPC-DS 定义的查询语法为标准 SQL,因此设置数据库会话的方言类型为 ANSI
set dialect_type = 'ANSI'
复制代码
执行后结果如图
选择 TPC-DS 的 99 个查询中你希望的执行,因为 q78 查询会因为内存限制而执行失败,所以我们优先执行 q78 查询语句,打开 q78 sql 文件,复制 sql 执行
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_sold_year, ss_item_sk, ss_customer_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2000
order by
ss_sold_year, ss_item_sk, ss_customer_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100;
复制代码
复制 sql 到命令框后,点击回车确定,这里可以看到执行失败
这时在失败的 sql 后加上一下语句再执行尝试,其中参数distributed_max_parallel_size
可以设置为 4 的其他整数倍(因为 Worker 的数量为 4)
SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码
更改后的 q78 sql 文件内容如下
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_sold_year, ss_item_sk, ss_customer_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2000
order by
ss_sold_year, ss_item_sk, ss_customer_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100
SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码
本次执行成功了,返回 100 条数据,耗时 60.510 秒。处理了 5.0421 亿行,6.05 GB(833 万行/秒,99.96 MB/秒)
100 rows in set. Elapsed: 60.510 sec. Processed 504.21 million rows, 6.05 GB (8.33 million rows/s., 99.96 MB/s.)
复制代码
我再换一条 sql 执行操作,选择 q99 sql 命令复制并执行
select substr(w_warehouse_name,1,20)
,sm_type
,cc_name
,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end) as "30 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 30) and
(cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end ) as "31-60 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 60) and
(cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end) as "61-90 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 90) and
(cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end) as "91-120 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 120) then 1 else 0 end) as ">120 days"
from
catalog_sales
,warehouse
,ship_mode
,call_center
,date_dim
where
d_month_seq between 1200 and 1200 + 11
and cs_ship_date_sk = d_date_sk
and cs_warehouse_sk = w_warehouse_sk
and cs_ship_mode_sk = sm_ship_mode_sk
and cs_call_center_sk = cc_call_center_sk
group by
substr(w_warehouse_name,1,20)
,sm_type
,cc_name
order by substr(w_warehouse_name,1,20)
,sm_type
,cc_name
limit 100;
复制代码
执行结果返回 100 条数据,耗时 9.086 秒。处理了 7313 千行,197.11 KB(8.05 千行/秒,21.69 KB/秒)
100 rows in set. Elapsed: 9.086 sec. Processed 73.13 thousand rows, 197.11 KB (8.05 thousand rows/s., 21.69 KB/s.)
复制代码
添加参数限制查询的最大内存使用量引发 oom,在 q99 sql 命令后增加如下命令
-- (单位为 B,当前约合 37.25 GB)
SETTINGS max_memory_usage=40000000000;
复制代码
更改后的 sql 命令依然执行成功,这个时候我不断调整,按照每次下调资源总数 70%的方式,不断缩小 max_memory_usage 的大小进行尝试,
SETTINGS max_memory_usage=93052205;
复制代码
经过多次的尝试,设置为 SETTINGS max_memory_usage=93052205; 后出现了内存错误信息
此时我们再为 q99 sql 增加参数 distributed_max_parallel_size 来进行尝试,在上述 sql 后继续增加以下命令信息
SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码
增加参数 分布式最大并行大小 参数 distributed_max_parallel_size 后的 sql 从最大并行大小 4 调整到了 128,进行尝试,但是还是没有成功
select substr(w_warehouse_name,1,20)
,sm_type
,cc_name
,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end) as "30 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 30) and
(cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end ) as "31-60 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 60) and
(cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end) as "61-90 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 90) and
(cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end) as "91-120 days"
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 120) then 1 else 0 end) as ">120 days"
from
catalog_sales
,warehouse
,ship_mode
,call_center
,date_dim
where
d_month_seq between 1200 and 1200 + 11
and cs_ship_date_sk = d_date_sk
and cs_warehouse_sk = w_warehouse_sk
and cs_ship_mode_sk = sm_ship_mode_sk
and cs_call_center_sk = cc_call_center_sk
group by
substr(w_warehouse_name,1,20)
,sm_type
,cc_name
order by substr(w_warehouse_name,1,20)
,sm_type
,cc_name
limit 100
SETTINGS max_memory_usage=93052205,bsp_mode = 1,distributed_max_parallel_size = 76;
复制代码
并且我发现随着分布式最大并行大小 参数 distributed_max_parallel_size 的 不断升高,执行 sql 等待的时间也越来越长,我这边调整参数 distributed_max_parallel_size 从 4 一直按照 4 的倍数尝试到 76 ,执行时间从最初的 10s 以内到现在的近 180s 还是没成功,
下面我准备再体验其他 sql 语句的查询效果。复制 q01 的 sql 到命令框,这时可以看到 sql 文件内容本身有问题并不能执行,而是被截成了两断,
其实上面的 sql 直接从 sql 文件复制出来执行也是会报错的,我这边都是经过根据错误提示修改后执行成功的,对于 q01 按照错误提示修改后 sql 如下
with customer_total_return as
(
select
sr_customer_sk as ctr_customer_sk,
sr_store_sk as ctr_store_sk
,sum(sr_return_amt) as ctr_total_return
from store_returns, date_dim
where sr_returned_date_sk = d_date_sk and d_year = 2000
group by sr_customer_sk,sr_store_sk)
select c_customer_id
from customer_total_return ctr1, store, customer
where ctr1.ctr_total_return > (
select avg(ctr_total_return) *1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
and s_store_sk = ctr1.ctr_store_sk
and s_state = 'TN'
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
limit 100;
复制代码
执行后可以看到返回信息,返回 100 条数据,执行耗时 6.275 秒。处理了 147.1 万行,394.73 KB(234.4 万行/秒,62.90 KB/秒)
100 rows in set. Elapsed: 6.275 sec. Processed 147.10 thousand rows, 394.73 KB (23.44 thousand rows/s., 62.90 KB/s.)
复制代码
执行结果过如图所示
当然,TPC-DS 测试集均为模拟实际场景的查询,如果你希望了解具体查询的含义,可以将 SQL 复制到大模型产品(如豆包)中让其进行解释,能帮助您更好的理解查询对应的实际场景。豆包大模型地址https://www.doubao.com/chat/365034912382466,这里我输入 q01 sql 文件内容进行解释,得到如下内容
以下是对这段 SQL 代码的分析:
代码整体功能概述
这段 SQL 代码的目的是从数据库中查询出满足特定条件的客户 ID(c_customer_id)。它首先通过一个公共表达式(CTE,即 customer_total_return)计算出每个客户在特定商店(基于 sr_customer_sk 和 sr_store_sk 分组)在 2000 年的总退货金额(ctr_total_return)。然后,在主查询中筛选出那些在所在商店的总退货金额大于该商店平均总退货金额 1.2 倍,并且对应的商店位于 'TN'(田纳西州,推测这里 s_state 代表商店所在州)的客户,最后按照客户 ID 排序并限制返回结果为 100 条记录。
具体各部分分析
公共表达式(CTE)部分:customer_total_return
逻辑说明:
通过关联 store_returns 表(可能存储商店退货相关信息)和 date_dim 表(可能存储日期维度相关数据),以 sr_returned_date_sk 和 d_date_sk 作为关联条件,筛选出退货日期在 2000 年(通过 d_year = 2000 条件)的数据。
然后按照客户编号(sr_customer_sk)和商店编号(sr_store_sk)进行分组,并对每个分组内的退货金额(sr_return_amt)求和,得到每个客户在每个商店在 2000 年的总退货金额,将其命名为 ctr_total_return,同时将 sr_customer_sk 重命名为 ctr_customer_sk,sr_store_sk 重命名为 ctr_store_sk。
代码示例:
select
sr_customer_sk as ctr_customer_sk,
sr_store_sk as ctr_store_sk
,sum(sr_return_amt) as ctr_total_return
from store_returns, date_dim
where sr_returned_date_sk = d_date_sk and d_year = 2000
group by sr_customer_sk,sr_store_sk
主查询部分
筛选条件分析:
首先在 FROM 子句中关联了 customer_total_return(通过别名 ctr1 使用)、store 表和 customer 表。关联条件分别是 s_store_sk = ctr1.ctr_store_sk(将商店表和前面计算总退货的 CTE 按商店编号关联)以及 ctr1.ctr_customer_sk = c_customer_sk(将客户表和 CTE 按客户编号关联)。
在 WHERE 子句中,有多个筛选条件:
ctr1.ctr_total_return > (select avg(ctr_total_return) * 1.2 from customer_total_return ctr2 where ctr1.ctr_store_sk = ctr2.ctr_store_sk):这是一个比较关键的筛选条件,它使用了子查询。对于每一个 ctr1 中的记录(即每个客户在每个商店的情况),会在子查询中计算出与当前记录所在商店相同(通过 ctr1.ctr_store_sk = ctr2.ctr_store_sk 关联条件)的所有客户的平均总退货金额,然后要求当前客户在该商店的总退货金额大于这个平均总退货金额的 1.2 倍。
s_state = 'TN':筛选出所在商店位于田纳西州的情况,这里是基于 store 表中的 s_state 字段进行筛选。
排序和限制结果:
最后使用 ORDER BY c_customer_id 按照客户 ID 进行排序,并通过 LIMIT 100 限制只返回前 100 条记录。
代码示例:
select c_customer_id
from customer_total_return ctr1, store, customer
where ctr1.ctr_total_return > (
select avg(ctr_total_return) *1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
and s_store_sk = ctr1.ctr_store_sk
and s_state = 'TN'
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
limit 100;
潜在优化点或注意事项
关联方式:
当前代码中在 CTE 和主查询里使用的表连接方式是隐式连接(使用逗号分隔表,在 WHERE 子句中写关联条件),建议可以改为显式的 JOIN 语法,这样代码的可读性会更好。例如在 CTE 中可以写成:
select
sr_customer_sk as ctr_customer_sk,
sr_store_sk as ctr_store_sk
,sum(sr_return_amt) as ctr_total_return
from store_returns
JOIN date_dim ON sr_returned_date_sk = d_date_sk
where d_year = 2000
group by sr_customer_sk,sr_store_sk
在主查询中类似地可以将连接改为显式的 JOIN 语句来提升可读性。
性能方面:
如果数据量较大,涉及到的子查询(计算同商店平均退货金额的子查询)可能会影响性能,因为对于每一条外部查询的记录都要执行一次子查询。可以考虑使用窗口函数等方式来预先计算平均退货金额并关联到原始数据上,以优化性能,不过这需要根据具体数据库的支持情况来调整代码实现。
索引使用:
考虑在关联字段(如 sr_customer_sk、sr_store_sk、d_date_sk、s_store_sk、c_customer_sk 等)以及用于筛选和排序的字段(如 d_year、s_state、c_customer_id)上根据数据库的查询特点创建合适的索引,有助于提升查询性能。
总的来说,这段代码实现了特定业务需求下筛选客户的功能,但在代码风格和性能方面有一定的优化空间,可以根据实际使用的数据库环境做进一步调整。
复制代码
豆包界面如图所示,简洁明了,唯一亮点是竟然不用登录就可以使用,这个就省去了每次登录的麻烦,很好
后面可以测试的 sql 还有很多,这里受限于文章篇幅,不再依依在这里描述了,大家可以在官方提供的测试服务器进行操作,不过根据我操作了这么多 sql 的经验,很少有粘贴出来 sql 就可以执行的,经常会遇到 sql 换行引起的 sql 查询断行的情况,就像这样
到这里关于实验操作操作的部分就已经告一段落,后续对于不同场景的业务 sql 的执行,这里也就不再一一列举截图了。那么相对于传统 ELT,ByConity 有哪些优化的地方。 下面再来聊一下 ByConity 相比于 ELT 优化的地方,这里需要声明一点哈,对于 ByConity 相比于 ELT 优化的地方 的内容摘自于其他文章内容,也是本次活动页地址:https://xie.infoq.cn/article/dd49679fc505dc3249692cf74
以下内容基本来自活动页内容搬过来的,为了方便大家在实验操作之后,回想实验中的操作体验,可以结合传统 ELT 能力做个对比,对比的内容我搬来放在下面,不用大家再去别的地方搜索增加学习负担了。
------------------------------------------------------------------------------------------------------------
ByConity 相对于 ELT 能力的优化
提升并行度
在传统结构中,为什么要分别建设离线数仓和实时数仓。是因为常见的 OLAP 产品不擅长处理大量的复杂查询,很容易把内容打满任务中断,甚至造成宕机。
ByteHouse 具备 BSP 模式,支持将查询切分为不同的 stage,每个 stage 独立运行。在此基础上,stage 内的数据也可以进行切分,并行化不再受节点数量限制,理论上可以无限扩展,从而大幅度降低峰值内存。正如我们在测试 q78 时,当遇到内存不足情况时,可以通过 bsp 模式来设置并行参数 distributed_max_parallel_size 来提升并行度,保障系统稳定性。
任务级重试
对于离线加工任务来说,离线加工任务的另外一个特点就是链路比较长,并且任务间有依赖关系。当某一个任务失败会导致整个链路失败,
比如,task4 依赖 task1、task2 的完成。如果 task1 失败发起重试,会显示为整个链路执行失败。ByteHouse 增加了任务级重试能力,在 ByteHouse 中只有运行失败的 task 需要重试。
并行写入
实时数仓存在频繁更新的特点,使用重叠窗口进行批量 ETL 操作时,会带来大量的数据更新。在这种场景下,ByteHouse 做了大量的优化。
经过持续优化,将最耗时的数据写入部分单独并行化,并且在写入 part 文件时标记是否需要进行后续的 dedup 作业。在所有数据写入完毕后,由 server 指定一个 worker 进行 dedup 和最后的事务提交(如上图最右)。
简化数据链路
ByteHouse 在传统的 MPP 链路基础上增加了对复杂查询的支持,这使得 join 等操作可以有效地得到执行。
BSP 模式使用 barrier 将各个 stage 进行隔离,每个 stage 独立运行,stage 之内的 task 也相互独立。即便机器环境发生变化,对查询的影响被限定在 task 级别。且每个 task 运行完毕后会及时释放计算资源,对资源的使用更加充分。
在这个基础上,BSP 的这种设计更利于重试的设计。任务失败后,只需要重新拉起时读取它所依赖的任务的 shuffle 数据即可,而无需考虑任务状态。
评论