写点什么

Domino 流计算快速上手 - 2. 聚集

  • 2024-11-29
    北京
  • 本文字数:3647 字

    阅读完需:约 12 分钟

Domino 流计算快速上手 - 2. 聚集

根据经典的数据仓库建模理论,原始交易数据经过扩维操作存储至 DWD (明细层)后,下一步就是根据分析需求,针对明细数据进行各类聚集计算。

像"扩维"操作一样,我们通常采用“跑批”的方式进行处理。具体而言,在 ETL 和扩维操作后,我们会使用工具调度执行 SQL, 针对 DWD 层"大宽表"存储的数据进行聚集计算。

一方面前置 ETL 和扩维操作均为批处理形式, 导致聚集计算也不得不采取批处理方式,因而无法达到“实时”的效果;另一方面,由于 ODS 层通常存储的是不同分类方式的汇总数据,计算时会涉及大量的数据扫描,占用较多系统资源,不得不选择在夜间低负载时执行,进一步限制了分析业务的灵活性。

通过使用 Domino 流计算技术,用户可以在 YMatrix 数据库中,用简单的 SQL 描述聚集操作,结合第一篇文章 Domino 流计算快速上手 - 1.扩维 (点击链接)介绍的流式的扩维操作一起,就能实现交易数据到聚集结果实时流式更新,即 ODS -> DWD-> DWS 的实时链路。

下面,我们就通过实时计算月度和年度产量的例子来演示如何使用 Domino 流计算技术进行实时的聚集计算。

这个例子的特别之处是我们使用了流式级联架构,也就是在产量明细表之后将月度、年度汇总依次串联聚集。



表结构

dwd_production为产量信息表;

dws_stream_agg_month为产量的月度汇总信息;

dws_stream_agg_year为产量的年度汇总信息。



操作

1. 第一步,让我们创建一个用于存储产品产量信息的表dwd_production 并插入数据。


-- 创建产品生产信息表CREATE TABLE dwd_production (    id        bigserial,    category int,    value     bigint,    ts        timestamp) DISTRIBUTED BY (id);
-- 插入数据INSERT INTO dwd_production(category, value, ts) VALUES (1002, 59, '2023-12-12 03:44:05'), (1001, 15, '2024-01-02 11:22:33'), (1001, 20, '2024-01-03 22:33:44'), (1002, 34, '2024-01-04 01:02:03'), (1001, 27, '2024-02-11 02:03:04'), (1002, 57, '2024-02-12 03:04:05');
复制代码

2. 第二步,创建流dws_stream_agg_month,用于产品月度生产量的聚集操作。创建流dws_stream_agg_year ,用于产品年度生产量的聚集操作。当接收到新数据后,流 dws_stream_agg_month 和dws_stream_agg_year 自动执行聚集操作,更新流表中结果为最新。

--创建流 stream_agg_month,月度产品生产量CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (   SELECT     category,     extract(year FROM date_trunc('year', ts)::date),     extract(month FROM date_trunc('month', ts)::date),     date_trunc('month', ts)::date,     sum(value),     count(value)   FROM STREAMING ALL dwd_production   GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组 ) DISTRIBUTED BY (category, y, m);
--创建流 stream_agg_year,年度产品生产量CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS ( SELECT dwd_stream_agg_month.category, dwd_stream_agg_month.y, sum(dwd_stream_agg_month.month_sum), sum(dwd_stream_agg_month.month_cnt) FROM STREAMING ALL stream_agg_month GROUP BY 1, 2 --按照产品种类,年进行分组 ) DISTRIBUTED BY (category, year);
复制代码

3. 第三步,分析和查询数据结果

我们可以先查询当前表 dwd_production 中的产品生产数量信息。

-- 按照产品种类,时间进行排序SELECT * FROM dwd_production ORDER BY 2,4;
复制代码


然后查询表 dws_stream_agg_month 和表 dws_stream_agg_year,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。

--查询产品月度生产量信息,按种类,年,月排序SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category |  y   | m  |     ym     | month_sum | month_cnt ----------+------+----+------------+-----------+-----------     1001 | 2024 |  1 | 2024-01-01 |        35 |         2      1001 | 2024 |  2 | 2024-02-01 |        27 |         1      1002 | 2023 | 12 | 2023-12-01 |        59 |         1      1002 | 2024 |  1 | 2024-01-01 |        34 |         1      1002 | 2024 |  2 | 2024-02-01 |        57 |         1 (5 rows)
--查询产品年度生产量信息,按种类,年排序SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt----------+------+----------+---------- 1001 | 2024 | 62 | 3 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2(3 rows)
复制代码

让我们再向表 dwd_production 中新增一条产品生产量数据,流 dws_stream_agg_month 和 dws_stream_agg_year 会对新增的数据进行实时连续聚集。

INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
复制代码

再次查询表 dws_stream_agg_month 和表 dws_stream_agg_year,表中显示最新的聚集数据结果。

SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category |  y   | m  |     ym     | month_sum | month_cnt ----------+------+----+------------+-----------+-----------     1001 | 2024 |  1 | 2024-01-01 |        35 |        2      1001 | 2024 |  2 | 2024-02-01 |        27 |        1      1001 | 2024 |  4 | 2024-04-01 |        30 |        1      1002 | 2023 | 12 | 2023-12-01 |        59 |        1      1002 | 2024 |  1 | 2024-01-01 |        34 |        1      1002 | 2024 |  2 | 2024-02-01 |        57 |        1 (6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt----------+------+----------+---------- 1001 | 2024 | 92 | 4 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2(3 rows)
复制代码

4. 若我们需要更新产品生产量信息表 dwd_production 中的数据时,则可使用 UPDATE语句进行操作。

UPDATE dwd_production SET value = 100 WHERE id = 7;
复制代码

然后查询流表 dws_stream_agg_month 和表 dws_stream_agg_year,聚集结果已更新。

SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category |  y   | m  |     ym     | month_sum | month_cnt ----------+------+----+------------+-----------+-----------     1001 | 2024 |  1 | 2024-01-01 |        35 |         2      1001 | 2024 |  2 | 2024-02-01 |        27 |         1      1001 | 2024 |  4 | 2024-04-01 |       100 |         1      1002 | 2023 | 12 | 2023-12-01 |        59 |         1      1002 | 2024 |  1 | 2024-01-01 |        34 |         1      1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt----------+------+----------+---------- 1001 | 2024 | 162 | 4 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2(3 rows)
复制代码

5. 若我们需要删除产品生产量数据表 dwd_production的数据时,则可使用 DELETE 语句进行操作。

DELETE FROM dwd_production WHERE id = 7;
复制代码

然后查询流表 dws_stream_agg_month和表 dws_stream_agg_year,聚集结果已更新。

-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。DELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category |  y   | m  |     ym     | month_sum | month_cnt      ----------+------+----+------------+-----------+-----------     1001 | 2024 |  1 | 2024-01-01 |        35 |         2      1001 | 2024 |  2 | 2024-02-01 |        27 |         1      1001 | 2024 |  4 | 2024-04-01 |         0 |         0      1002 | 2023 | 12 | 2023-12-01 |        59 |         1      1002 | 2024 |  1 | 2024-01-01 |        34 |         1      1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt----------+------+----------+---------- 1001 | 2024 | 62 | 3 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2(3 rows)
复制代码

本文为 YMatrix 原创内容,未经允许不得转载。

欲了解更多超融合时序数据库相关信息,请访问 “YMatrix 数据库官方网站

发布于: 刚刚阅读数: 3
用户头像

MatrixDB 超融合时序数据库 2021-10-28 加入

全球超融合时序数据库开创者,专为物联网、车联网、工业互联网和智慧城市提供一站式数据平台。

评论

发布
暂无评论
Domino 流计算快速上手 - 2. 聚集_数据库_YMatrix 超融合数据库_InfoQ写作社区