I. 项目背景
ByConity 是一个先进的数据集成与处理平台,专注于提供高效、灵活的 ELT(Extract, Load, Transform)能力。它致力于通过简化数据管道设计和优化数据处理流程,帮助企业应对复杂的大数据处理需求,实现更快的数据传输速度、更高的处理能力以及更低的运维成本。
ByConity 的设计理念是简化数据工程师的工作流程,帮助他们从繁琐的底层技术细节中解放出来,将精力集中在数据的清洗、变换和分析上。通过灵活的配置和强大的扩展能力,ByConity 能够在多种复杂的业务场景中表现出色,适配各类数据管道需求。
数据清洗效率提升
传统 ELT 工具在处理脏数据时效率低下,而 ByConity 内置多种高效的数据清洗工具,能够自动化处理数据中的异常值、缺失值以及不一致性。
高效的变换引擎
借助 ByConity 的高度优化的变换引擎,数据处理流程能够显著加速。无论是简单的格式转换,还是复杂的聚合与计算,ByConity 都可以快速完成。
灵活的数据管道管理
ByConity 提供了模块化的数据管道管理能力,支持多种数据源的接入,并通过直观的配置界面和可编程 API 降低了管道管理的复杂度。
实时数据处理支持
与传统的批量处理模式相比,ByConity 引入了实时数据处理机制,支持数据的实时提取、加载与变换,满足业务的快速决策需求。
扩展性与兼容性
ByConity 支持与主流的大数据平台、云服务以及数据库系统无缝集成,如 Hadoop、Spark、AWS 和 MySQL。这种高度的兼容性确保了用户能够在现有基础设施上快速部署,并且能够随着业务需求的增长平滑扩展。
II. ByConity 的核心功能与优势
ByConity 提供了完整的数据生命周期管理解决方案,从数据采集、清洗、变换到加载,帮助企业实现高效的数据集成。以下是 ByConity 的几个关键功能:
数据采集与传输
ByConity 支持从各种数据源(如数据库、日志文件、数据流、API 等)进行数据采集。通过数据源插件,用户可以快速连接到各种数据源,并实现实时数据采集。
假设您需要从 MySQL 数据库中提取数据,并将其加载到 Hadoop 集群中进行处理。
# 配置数据源
datasource:
mysql:
type: "jdbc"
url: "jdbc:mysql://localhost:3306/mydb"
username: "user"
password: "password"
# 配置数据流
stream:
- source: "mysql"
extract:
table: "source_table"
transform:
- filter:
conditions:
column: "age"
operator: ">"
value: 30
load:
target:
type: "hadoop"
outputPath: "/path/to/hadoop/output"
复制代码
在这个示例配置中,连接到名为 mydb
的 MySQL 数据库。通过配置 datasource
,我们指定了连接的 URL、用户名和密码。然后,我们定义了数据流,从 mysql
数据源中提取数据,应用过滤器,保留 age
列大于 30 的数据,并将其加载到 Hadoop 集群中。
数据清洗与变换
ByConity 提供了强大的数据清洗和变换功能,支持多种变换操作,如数据清洗、过滤、排序、聚合、分组等。用户可以通过拖放式界面直接定义数据流处理过程。
实例分析:假设需要对日志数据进行清洗和变换,将无效的 IP 地址、异常字符过滤掉,并进行简单的聚合统计。
transform:
- filter:
field: "ip_address"
operation: "is_valid"
- replace:
field: "message"
old_value: "error"
new_value: "warning"
- aggregate:
fields: ["count"]
operation: "sum"
复制代码
在这个示例中,首先通过 filter
操作删除不符合 is_valid
条件的 ip_address
。接着,通过 replace
操作将日志信息中的 error
替换为 warning
。最后,通过 aggregate
操作对日志进行统计,计算每条日志中 count
字段的总和。
3. 数据加载与存储
ByConity 支持将数据加载到各种存储目标,包括关系型数据库、Hadoop、云存储、数据仓库等。通过配置加载目标,用户可以轻松将处理后的数据上传到最终的存储位置。
实例分析:将清洗后的数据从 Hadoop 集群加载到 Oracle 数据库中。
load:
- target:
type: "oracle"
url: "jdbc:oracle:thin:@localhost:1521:orcl"
username: "db_user"
password: "db_password"
table: "final_data"
复制代码
在这个示例配置中,连接到 Oracle 数据库,并指定了目标表 final_data
。通过提供 JDBC URL、用户名和密码,我们可以将处理后的数据从 Hadoop 集群中加载到指定的表中。
III. ByConity 的 ELT 能力优化
ByConity 在传统 ETL(Extract, Transform, Load)的基础上,结合了更多的优化技术,使得 ELT(Extract, Load, Transform)能够更高效、更快速地处理大数据集成任务。以下是 ByConity 在 ELT 优化方面的一些核心技术:
并行处理与分布式计算
ByConity 通过将数据分块并行处理,显著提升数据处理速度。特别是在大数据集成时,能够充分利用集群资源,减少单个节点的负担。
实例分析:使用 ByConity 的分布式计算功能处理百万级别的数据集。
parallel:
partitions: 8
strategy: "hash"
source:
type: "hadoop"
path: "/path/to/input_data"
transform:
- filter:
field: "status"
operation: "="
value: "active"
load:
target:
type: "oracle"
url: "jdbc:oracle:thin:@localhost:1521:orcl"
username: "db_user"
password: "db_password"
table: "active_status_data"
复制代码
在这个示例中,设置了 8 个并行分区来处理数据。strategy
为 hash
,确保每个分区的数据都能够均匀分布在集群的各个节点上。数据从 Hadoop 集群中提取,通过过滤条件后,将符合 status
为 active
的数据加载到 Oracle 数据库中。
数据缓存与高速缓存机制
ByConity 使用高速缓存机制来减少数据变换和重复加载的时间,提高数据处理效率。用户可以指定缓存策略,如 LRU(Least Recently Used),来管理缓存的生命周期。
实例分析:配置 ByConity 的缓存机制,提高日志处理速度。
cache:
enabled: true
strategy: "LRU"
maxSize: 10000
evictionPolicy: "LRU"
source:
type: "hadoop"
path: "/path/to/logs"
transform:
- filter:
field: "error_level"
operation: "="
value: "critical"
load:
target:
type: "oracle"
url: "jdbc:oracle:thin:@localhost:1521:orcl"
username: "db_user"
password: "db_password"
table: "critical_errors"
复制代码
详细解释:在这个配置中,启用了缓存机制并使用 LRU 策略。缓存的最大大小设定为 10,000 条记录。当缓存达到此大小时,最少使用的条目将被移除,以便腾出空间。数据从 Hadoop 中提取,过滤掉非 critical
错误级别的日志,并将符合条件的数据存储在 Oracle 数据库中。
自动数据恢复与故障恢复
ByConity 提供了自动故障恢复机制,在数据加载过程中,如果出现意外中断,可以自动恢复到最近成功的状态,避免数据丢失和中断处理。
实例分析:配置自动故障恢复机制,确保数据从 Hadoop 到目标数据库的准确性。
recovery:
enabled: true
strategy: "checkpoint"
source:
type: "hadoop"
path: "/path/to/input_data"
transform:
- filter:
field: "status"
operation: "="
value: "active"
load:
target:
type: "oracle"
url: "jdbc:oracle:thin:@localhost:1521:orcl"
username: "db_user"
password: "db_password"
table: "active_status_data"
复制代码
启用自动恢复机制,通过 checkpoint
跟踪处理进度。每个步骤处理完成后,都创建一个检查点文件记录当前的处理状态。一旦系统意外中断,ByConity 将从最近的检查点恢复处理,从而确保数据完整性。
IV. 实践应用实例
ssh 登录
在本地主机打开“开始”,打开“命令提示符”终端,输入ssh -p 23 <提供的用户名>@<ECS服务器IP地址>
,并回车确认。
如果系统提示你输入 yes 或者 no 来确认是否连接,输入yes
并回车。
然后输入<提供的登录密码>
并回车。
为避免使用时超时自动断开连接,请运行tmux new -s $user_id
(如 tmux new -s user0001
)命令创建一个新的 tmux 会话,其中$user_id
是可以自定义的会话名称。(后续重新登录时,使用 tmux a -t $user_id
)。
执行 clickhouse client --port 9010
命令进入客户端。如果后续输入 SQL 会被截断,在此处可以执行clickhouse client --port 9010 -mn
,此后 SQL 后需要加;
作为结束。
测试
select
ss_sold_year, ss_item_sk, ss_customer_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from
(
select d_year AS ss_sold_year, ss_item_sk, ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null and ss_sold_date_sk = d_date_sk and d_year = 2000
group by d_year, ss_item_sk, ss_customer_sk
) ss
left join
(
select d_year AS ws_sold_year, ws_item_sk, ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null and ws_sold_date_sk = d_date_sk and d_year = 2000
group by d_year, ws_item_sk, ws_bill_customer_sk
) ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join
(
select d_year AS cs_sold_year, cs_item_sk, cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null and cs_sold_date_sk = d_date_sk and d_year = 2000
group by d_year, cs_item_sk, cs_bill_customer_sk
) cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0)
order by
ss_sold_year, ss_item_sk, ss_customer_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100;
复制代码
这里没有设置 bsp,直接内存超了。
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_sold_year, ss_item_sk, ss_customer_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2000
order by
ss_sold_year, ss_item_sk, ss_customer_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100
SETTINGS bsp_mode = 1,distributed_max_parallel_size = 12;
复制代码
使用了 bsp 之后的查询速度对比
bsp= 0
WITH sales_detail AS (
SELECT
d_year,
i_brand_id,
i_class_id,
i_category_id,
i_manufact_id,
cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt,
cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt FROM catalog_sales JOIN item ON i_item_sk = cs_item_sk JOIN date_dim ON d_date_sk = cs_sold_date_sk LEFT JOIN catalog_returns ON cs_order_number = cr_order_number AND cs_item_sk = cr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ss_quantity - COALESCE(sr_return_quantity, 0) AS sales_cnt, ss_ext_sales_price - COALESCE(sr_return_amt, 0.0) AS sales_amt FROM store_sales JOIN item ON i_item_sk = ss_item_sk JOIN date_dim ON d_date_sk = ss_sold_date_sk LEFT JOIN store_returns ON ss_ticket_number = sr_ticket_number AND ss_item_sk = sr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ws_quantity - COALESCE(wr_return_quantity, 0) AS sales_cnt, ws_ext_sales_price - COALESCE(wr_return_amt, 0.0) AS sales_amt FROM web_sales JOIN item ON i_item_sk = ws_item_sk JOIN date_dim ON d_date_sk = ws_sold_date_sk LEFT JOIN web_returns ON ws_order_number = wr_order_number AND ws_item_sk = wr_item_sk WHERE i_category = 'Books' ), annual_sales AS ( SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, SUM(sales_cnt) AS sales_cnt, SUM(sales_amt) AS sales_amt FROM sales_detail GROUP BY d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id ) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS current_year, curr_yr.i_brand_id, curr_yr.i_class_id, curr_yr.i_category_id, curr_yr.i_manufact_id, prev_yr.sales_cnt AS prev_year_sales_cnt, curr_yr.sales_cnt AS current_year_sales_cnt, curr_yr.sales_cnt - prev_yr.sales_cnt AS sales_cnt_diff, curr_yr.sales_amt - prev_yr.sales_amt AS sales_amt_diff FROM annual_sales curr_yr JOIN annual_sales prev_yr ON curr_yr.i_brand_id = prev_yr.i_brand_id AND curr_yr.i_class_id = prev_yr.i_class_id AND curr_yr.i_category_id = prev_yr.i_category_id AND curr_yr.i_manufact_id = prev_yr.i_manufact_id AND curr_yr.d_year = 2002 AND prev_yr.d_year = 2001 WHERE CAST(curr_yr.sales_cnt AS DECIMAL(17,2)) / CAST(prev_yr.sales_cnt AS DECIMAL(17,2)) < 0.9 ORDER BY sales_cnt_diff DESC,
sales_amt_diff DESC LIMIT 100
SETTINGS max_memory_usage = 20000000000
SETTINGS bsp_mode = 0,
distributed_max_parallel_size = 12;
复制代码
bsp=1
WITH sales_detail AS ( SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt, cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt FROM catalog_sales JOIN item ON i_item_sk = cs_item_sk JOIN date_dim ON d_date_sk = cs_sold_date_sk LEFT JOIN catalog_returns ON cs_order_number = cr_order_number AND cs_item_sk = cr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ss_quantity - COALESCE(sr_return_quantity, 0) AS sales_cnt, ss_ext_sales_price - COALESCE(sr_return_amt, 0.0) AS sales_amt FROM store_sales JOIN item ON i_item_sk = ss_item_sk JOIN date_dim ON d_date_sk = ss_sold_date_sk LEFT JOIN store_returns ON ss_ticket_number = sr_ticket_number AND ss_item_sk = sr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ws_quantity - COALESCE(wr_return_quantity, 0) AS sales_cnt, ws_ext_sales_price - COALESCE(wr_return_amt, 0.0) AS sales_amt FROM web_sales JOIN item ON i_item_sk = ws_item_sk JOIN date_dim ON d_date_sk = ws_sold_date_sk LEFT JOIN web_returns ON ws_order_number = wr_order_number AND ws_item_sk = wr_item_sk WHERE i_category = 'Books' ), annual_sales AS ( SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, SUM(sales_cnt) AS sales_cnt, SUM(sales_amt) AS sales_amt FROM sales_detail GROUP BY d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id ) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS current_year, curr_yr.i_brand_id, curr_yr.i_class_id, curr_yr.i_category_id, curr_yr.i_manufact_id, prev_yr.sales_cnt AS prev_year_sales_cnt, curr_yr.sales_cnt AS current_year_sales_cnt, curr_yr.sales_cnt - prev_yr.sales_cnt AS sales_cnt_diff, curr_yr.sales_amt - prev_yr.sales_amt AS sales_amt_diff FROM annual_sales curr_yr JOIN annual_sales prev_yr ON curr_yr.i_brand_id = prev_yr.i_brand_id AND curr_yr.i_class_id = prev_yr.i_class_id AND curr_yr.i_category_id = prev_yr.i_category_id AND curr_yr.i_manufact_id = prev_yr.i_manufact_id AND curr_yr.d_year = 2002 AND prev_yr.d_year = 2001 WHERE CAST(curr_yr.sales_cnt AS DECIMAL(17,2)) / CAST(prev_yr.sales_cnt AS DECIMAL(17,2)) < 0.9 ORDER BY sales_cnt_diff DESC, sales_amt_diff DESC LIMIT 100 SETTINGS max_memory_usage = 20000000000 SETTINGS bsp_mode = 1, distributed_max_parallel_size = 12;
复制代码
max_memory_usage = 20000000000
max_memory_usage = 40000000000
WITH sales_detail AS ( SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt, cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt FROM catalog_sales JOIN item ON i_item_sk = cs_item_sk JOIN date_dim ON d_date_sk = cs_sold_date_sk LEFT JOIN catalog_returns ON cs_order_number = cr_order_number AND cs_item_sk = cr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ss_quantity - COALESCE(sr_return_quantity, 0) AS sales_cnt, ss_ext_sales_price - COALESCE(sr_return_amt, 0.0) AS sales_amt FROM store_sales JOIN item ON i_item_sk = ss_item_sk JOIN date_dim ON d_date_sk = ss_sold_date_sk LEFT JOIN store_returns ON ss_ticket_number = sr_ticket_number AND ss_item_sk = sr_item_sk WHERE i_category = 'Books' UNION ALL SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, ws_quantity - COALESCE(wr_return_quantity, 0) AS sales_cnt, ws_ext_sales_price - COALESCE(wr_return_amt, 0.0) AS sales_amt FROM web_sales JOIN item ON i_item_sk = ws_item_sk JOIN date_dim ON d_date_sk = ws_sold_date_sk LEFT JOIN web_returns ON ws_order_number = wr_order_number AND ws_item_sk = wr_item_sk WHERE i_category = 'Books' ), annual_sales AS ( SELECT d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id, SUM(sales_cnt) AS sales_cnt, SUM(sales_amt) AS sales_amt FROM sales_detail GROUP BY d_year, i_brand_id, i_class_id, i_category_id, i_manufact_id ) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS current_year, curr_yr.i_brand_id, curr_yr.i_class_id, curr_yr.i_category_id, curr_yr.i_manufact_id, prev_yr.sales_cnt AS prev_year_sales_cnt, curr_yr.sales_cnt AS current_year_sales_cnt, curr_yr.sales_cnt - prev_yr.sales_cnt AS sales_cnt_diff, curr_yr.sales_amt - prev_yr.sales_amt AS sales_amt_diff FROM annual_sales curr_yr JOIN annual_sales prev_yr ON curr_yr.i_brand_id = prev_yr.i_brand_id AND curr_yr.i_class_id = prev_yr.i_class_id AND curr_yr.i_category_id = prev_yr.i_category_id AND curr_yr.i_manufact_id = prev_yr.i_manufact_id AND curr_yr.d_year = 2002 AND prev_yr.d_year = 2001 WHERE CAST(curr_yr.sales_cnt AS DECIMAL(17,2)) / CAST(prev_yr.sales_cnt AS DECIMAL(17,2)) < 0.9 ORDER BY sales_cnt_diff DESC, sales_amt_diff DESC LIMIT 100 SETTINGS max_memory_usage = 40000000000 SETTINGS bsp_mode = 1, distributed_max_parallel_size = 12;
复制代码
在使用 bsp 模式处理之后速度几乎快了 4 倍。
V. 结尾
BSP 模式大大提高的效率。
问题:
并行处理的资源瓶颈:在分布式环境中,节点资源利用不均可能导致负载不平衡。
缓存策略误用:如果缓存策略配置不合理,可能导致数据冗余或性能下降。
错误恢复机制复杂性:自动故障恢复功能需要在复杂环境中进行广泛测试,避免因配置不当引发更多问题。
建议:
优化资源调度:利用智能调度算法动态分配资源,平衡分布式计算节点的负载。
引入流量管理:针对实时处理场景,引入流量控制策略,确保在高负载下系统的稳定性。
定期审查缓存策略:根据使用场景调整缓存大小、策略和生存周期,避免资源浪费。
全面测试恢复机制:在实际部署前模拟各种可能的故障场景,确保恢复机制的可靠性。
评论