根据经典的数据仓库建模理论,原始交易数据经过扩维操作存储至 DWD (明细层)后,下一步就是根据分析需求,针对明细数据进行各类聚集计算。
像"扩维"操作一样,我们通常采用“跑批”的方式进行处理。具体而言,在 ETL 和扩维操作后,我们会使用工具调度执行 SQL, 针对 DWD 层"大宽表"存储的数据进行聚集计算。
一方面前置 ETL 和扩维操作均为批处理形式, 导致聚集计算也不得不采取批处理方式,因而无法达到“实时”的效果;另一方面,由于 ODS 层通常存储的是不同分类方式的汇总数据,计算时会涉及大量的数据扫描,占用较多系统资源,不得不选择在夜间低负载时执行,进一步限制了分析业务的灵活性。
通过使用 Domino 流计算技术,用户可以在 YMatrix 数据库中,用简单的 SQL 描述聚集操作,结合第一篇文章 Domino 流计算快速上手 - 1.扩维 (点击链接)介绍的流式的扩维操作一起,就能实现交易数据到聚集结果实时流式更新,即 ODS -> DWD-> DWS 的实时链路。
下面,我们就通过实时计算月度和年度产量的例子来演示如何使用 Domino 流计算技术进行实时的聚集计算。
这个例子的特别之处是我们使用了流式级联架构,也就是在产量明细表之后将月度、年度汇总依次串联聚集。
表结构
dwd_production
为产量信息表;
dws_stream_agg_month
为产量的月度汇总信息;
dws_stream_agg_year
为产量的年度汇总信息。
操作
1. 第一步,让我们创建一个用于存储产品产量信息的表dwd_production
并插入数据。
-- 创建产品生产信息表
CREATE TABLE dwd_production (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- 插入数据
INSERT INTO dwd_production(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
复制代码
2. 第二步,创建流dws_stream_agg_month
,用于产品月度生产量的聚集操作。创建流dws_stream_agg_year
,用于产品年度生产量的聚集操作。当接收到新数据后,流 dws_stream_agg_month
和dws_stream_agg_year
自动执行聚集操作,更新流表中结果为最新。
--创建流 stream_agg_month,月度产品生产量
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL dwd_production
GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
)
DISTRIBUTED BY (category, y, m);
--创建流 stream_agg_year,年度产品生产量
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
dwd_stream_agg_month.category,
dwd_stream_agg_month.y,
sum(dwd_stream_agg_month.month_sum),
sum(dwd_stream_agg_month.month_cnt)
FROM STREAMING ALL stream_agg_month
GROUP BY 1, 2 --按照产品种类,年进行分组
)
DISTRIBUTED BY (category, year);
复制代码
3. 第三步,分析和查询数据结果
我们可以先查询当前表 dwd_production
中的产品生产数量信息。
-- 按照产品种类,时间进行排序
SELECT * FROM dwd_production ORDER BY 2,4;
复制代码
然后查询表 dws_stream_agg_month
和表 dws_stream_agg_year
,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。
--查询产品月度生产量信息,按种类,年,月排序
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(5 rows)
--查询产品年度生产量信息,按种类,年排序
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
复制代码
让我们再向表 dwd_production
中新增一条产品生产量数据,流 dws_stream_agg_month
和 dws_stream_agg_year
会对新增的数据进行实时连续聚集。
INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
复制代码
再次查询表 dws_stream_agg_month
和表 dws_stream_agg_year
,表中显示最新的聚集数据结果。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 30 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
复制代码
4. 若我们需要更新产品生产量信息表 dwd_production
中的数据时,则可使用 UPDATE
语句进行操作。
UPDATE dwd_production SET value = 100 WHERE id = 7;
复制代码
然后查询流表 dws_stream_agg_month
和表 dws_stream_agg_year
,聚集结果已更新。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 100 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
复制代码
5. 若我们需要删除产品生产量数据表 dwd_production
的数据时,则可使用 DELETE
语句进行操作。
DELETE FROM dwd_production WHERE id = 7;
复制代码
然后查询流表 dws_stream_agg_month
和表 dws_stream_agg_year
,聚集结果已更新。
-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
DELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 0 | 0
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
复制代码
本文为 YMatrix 原创内容,未经允许不得转载。
欲了解更多超融合时序数据库相关信息,请访问 “YMatrix 数据库”官方网站
评论