写点什么

基于 StarRocks 的指标平台查询加速方案

作者:袋鼠云数栈
  • 2025-07-01
    浙江
  • 本文字数:4159 字

    阅读完需:约 14 分钟

基于StarRocks的指标平台查询加速方案

项目背景


指标管理平台按指标查询类型可以划为落表指标和即席查询指标。


  • 落表指标:可选择不同的维度生成多个结果表(每天提交任务写入结果表),对指标进行取数的时候会根据查询条件自动匹配最合适的结果表进行查询。

  • 即席查询指标:不产生结果表,每次取数根据指标计算规则以及查询条件动态生成SQL去指标来源表中查询


举例说明:现有一张订单明细表 order_info,表结构如下


CREATE TABLE order_info (

order_id varchar(64) NOT NULL COMMENT "订单 id",

pt varchar(12) NOT NULL COMMENT "用户 id",

user_id varchar(64) NOT NULL COMMENT "用户 id",

price double NULL COMMENT "",

project_id int(11) NOT NULL COMMENT "产品 id",

channel varchar(64) NULL COMMENT "渠道"

) ENGINE=OLAP

PRIMARY KEY(order_id,pt)

PARTITION BY (pt)

DISTRIBUTED BY HASH(order_id)

PROPERTIES ( "replication_num" = "3",

"in_memory" = "false",

"enable_persistent_index" = "true",

"replicated_storage" = "true",

"compression" = "LZ4"

);


构建指标


(1)创建模型:示例只有单表不需要增加关联,选择 price 作为度量列,user_id、project_id、channel 作为维度列。


(2)创建原子指标:销售额、计算逻辑 sum(price) , 维度为模型的全部维度


(3)构建落表派生指标:当日销售金额、指标计算逻辑 sum(price) , 落表维度分别选择 channel (渠道当日销售金额), project_id (产品当日销售金额)


-- 渠道当日销售金额 create table sum_price_day_channel asselect sum(price) as sum_price_day , channel , '{pt}'from order_info where pt = '{pt}' group by channel;

-- 产品当日销售金额 create table sum_price_day_project asselect sum(price) as sum_price_day , project_id , '{pt}'from order_info where pt = '{pt}' group by project_id;


(4)构建即席查询派生指标:当日销售金额、指标计算逻辑 sum(price), 支持维度选择 channel、project_id。


查询指标


(1)根据维度 channel ,20250101<= pt <= 20250105 查询


a.即席查询:实时生成 sql


select sum(price) as sum_price_day,channel,pt

from order_info

where pt >= '20250101' and pt <= '20250105'

group by channel,pt


b.落表查询:当 sum_price_channel 表包含所有需要查询的日期,否则根据即席查询生成 sql 获取数据。


-- 当 sum_price_channel 包含所有查询日期

select sum_price_day,channel,pt

from sum_price_day_channel

where pt >= '20250101' and pt <= '20250105'


(2)根据维度 channel、project ,20250101<= pt <= 20250105 查询


因为落表指标没有同时包含 channel、project_id 的结果表则走即席查询逻辑


select sum(price) as sum_price_day,channel,project_id,ptfrom order_info

where pt >= '20250101' and pt <= '20250105'

group by channel,project_id,pt


StarRocks 物化视图



同步物化视图


限制


只支持单表

本质上是基表的索引而不是物理表


语法


CREATE MATERIALIZED VIEW [IF NOT EXISTS] [database.]<mv_name>

[COMMENT ""]

[PROPERTIES ("key"="value", ...)]

AS

<query_statement>


异步物化视图


基于default_catalog为基表创建的异步物化视图,StarRocks 通过排除数据与基表不一致的物化视图,来保证改写之后的查询与原始查询结果的强一致性。External Catalog 创建的物化视图由于异步刷新机制,查询结果可能与基表上查询的结果不一致。


限制


  • 异步物化视图不支持使用 List 分区策略,不支持基于使用 List 分区的基表创建。

  • 查询改写只支持 Cardinality Preservation Join(结果集行数不会超过输入表中的任意一方)

  • 不支持 grouping set、grouping set with rollup 以及 grouping set with cube 的查询改写

  • 分区物化视图只支持 Range 分区


语法


CREATE MATERIALIZED VIEW [IF NOT EXISTS] [database.]<mv_name>

[COMMENT ""]

-- 必须至少指定

distribution_desc

refresh_scheme 其中之一。

-- distribution_desc[DISTRIBUTED BY HASH(<bucket_key>[,<bucket_key2> ...]) [BUCKETS <bucket_number>]]

-- refresh_desc[REFRESH

-- refresh_moment

[IMMEDIATE | DEFERRED]

-- refresh_scheme

[ASYNC | ASYNC [START (<start_time>)] EVERY (INTERVAL <refresh_interval>) | MANUAL]]

-- partition_expression

[PARTITION BY {<date_column> | date_trunc(fmt, <date_column>)}]

-- order_by_expression

[ORDER BY (<sort_key>)][PROPERTIES ("key"="value", ...)]AS <query_statement>


手动刷新视图


-- 异步调用刷新任务。

REFRESH MATERIALIZED VIEW <mv_name>;

-- 同步调用刷新任务。

REFRESH MATERIALIZED VIEW <mv_name> WITH SYNC MODE;


查询加速


方案一:于 StarRocks 物化视图加速即席指标


StarRocks 查询改会校验是否可以复用已有物化视图中的预计算结果处理查询,如果不能复用会去原表查询,保证数据一致性。


(1)基于原子指标创建异步物化视图


CREATE MATERIALIZED VIEW sum_price_view

REFRESH ASYNC START('2025-05-01 09:00:00') EVERY (interval 1 day)

AS

SELECT

sum(price),user_id,project_id,channel,pt

FROM order_info group by user_id,project_id,channel,pt;


(2)根据维度 channel ,20250101<= pt <= 20250105 查询


即席查询生成 sql


select sum(price) as sum_price_day,channel,pt

from order_info

where pt >= '20250101' and pt<= '20250105'

group by channel,pt


因为有 sum_price 物化视图,StarRocks 会改写查询


select sum(price) as sum_price_day,channel,pt

from sum_price_view

where pt >= '20250101' and pt <= '20250105'

group by channel,pt


从而达到查询加速的目的。


方案二:基于 StarRocks 物化视图加速


落表指标落表指标只生成最多维度结果表,其他结果表基于最全结果表使用同步物化视图代替。


(1)与方案一一样也基于原子指标创建物化视图


(2)创建所有已选维度的结果表,结果表使用 range 分区


-- 结果表分区字段设置为 date 类型,分区方式使用时间表达式分区

-- 主键修改为 bigint 类型自增

CREATE TABLE IF NOT EXISTS sum_price_day_channel_project_id (pk bigint AUTO_INCREMENT,

pt datetime,

sum_price_day DOUBLE,

channel string,

project_id int(11)

)

PRIMARY KEY (pk,pt)

PARTITION BY date_trunc('day',pt)

DISTRIBUTED BY HASH(pk)PROPERTIES (

"enable_persistent_index" = "true");

-- 基于所有维度结果表创建异步分区物化视图

CREATE MATERIALIZED VIEW sum_price_day_channel_view

REFRESH ASYNC

PARTITION BY pt

AS

SELECT

sum(sum_price_day),channel

FROM sum_price_day_channel_project_id

where pt = '{pt}'group by channel;

CREATE MATERIALIZED VIEW sum_price_day_project_view

REFRESH

ASYNC

PARTITION BY pt

AS

SELECT

sum(sum_price_day),project_id

FROM sum_price_day_channel_project_id

where pt = '{pt}'

group by project_id;


(3)落表指标任务 sql 利用物化视图自动刷新机制,查询 sum_price_day_channel_view、sum_price_day_project_view 数据会与 sum_price_day_channel_project_id 结果一致,并支持查询改写。


insert OVERWRITE sum_price_day_channel_project_id PARTITION(pt='20250501') (pt,sum_price_day,channel,project_id)

select str2date('20250501', '%Y%m%d'),idx.sum_price_day,idx.channel,idx.project_id from

( select sum(price) as sum_price_day ,channel, project_id from order_info where pt = '{pt}'

group by project_id,channel,project_id;)idx


基于以上操作可以减少导入结果表次数加速任务运行,简化取数 sql 结合 StarRocks 查询改写提升查询性能。


方案三:其他优化


通过字典转换 string 类型为 integer 类型提升效率。


有序的排序聚合 (Sorted streaming aggregate),利用排序键提高 group 性能。


Colocate Join 通过指定 "colocate_with" = "group_name" 参数,使相同维度数据保持在同一组 BE 节点上,从而减少数据在节点间的传输耗时,提升 join 性能。


(1)创建字典表并导入数据。


CREATE TABLE channel_dict (

channel STRING,

channel_int BIGINT AUTO_INCREMENT

)

PRIMARY KEY (channel)

DISTRIBUTED BY HASH(channel)

PROPERTIES("replicated_storage" = "true");

CREATE TABLE order_id_dict (order_id STRING, order_id_int BIGINT AUTO_INCREMENT )

PRIMARY KEY (order_id)

DISTRIBUTED BY HASH(order_id)

PROPERTIES("replicated_storage" = "true");

CREATE TABLE user_id_dict (

user_id STRING,

user_id_int BIGINT AUTO_INCREMENT

)

PRIMARY KEY (user_id)

DISTRIBUTED BY HASH(user_id)

PROPERTIES("replicated_storage" = "true");

-- 导入数据

insert into channel_dict(channel) select distinct channel from order_info;

insert into order_id_dict(order_id) select distinct order_id from order_info;

insert into user_id_dict(user_id) select distinct user_id from order_info;


(2)创建包含 channel_integer 的结果表并导入数据。


CREATE TABLE order_info_integer (

order_id varchar(64) NOT NULL COMMENT "订单 id",

pt varchar(12) NOT NULL COMMENT "用户 id",

user_id varchar(64) NOT NULL COMMENT "用户 id",

price double NULL COMMENT "",

project_id int(11) NOT NULL COMMENT "产品 id",

channel varchar(64) NULL COMMENT "渠道"

-- 该列是配置 dict_mapping 的生成列,在导入数据时其列值自动从示例一中的字典表 dict 中获取。

-- 后续可以直接基于该列进行去重和 JOIN 查询。

channel_int BIGINT AS dict_mapping('channel_dict', channel),

order_id_int BIGINT AS dict_mapping('order_id_dict', order_id),

user_id BIGINT AS dict_mapping('user_id_dict', user_id)

)ENGINE=OLAP

PRIMARY KEY(order_id,pt)

PARTITION BY (pt)

DISTRIBUTED BY HASH(order_id)

PROPERTIES (

"replication_num" = "3",

"in_memory" = "false",

"enable_persistent_index" = "true",

"replicated_storage" = "true",

"compression" = "LZ4"

);

insert into order_info_integer(order_id,pt,user_id,price,project_id)

select order_id,pt,user_id,price,project_id from order_info;


(3)结果表存储以及后续关联都是用 integer 字段,会加速查询关联查询。


这个方案会产生字典数据,查询时需要查字典表进行 id 转换,会带来一定开销,适合关联比较频繁的场景使用。


用户头像

还未添加个人签名 2021-05-06 加入

还未添加个人简介

评论

发布
暂无评论
基于StarRocks的指标平台查询加速方案_数据库_袋鼠云数栈_InfoQ写作社区