Databend 在 1.0 中支持了对查询结果集的缓存,大大提高了多次相同查询返回结果的效率。
Query result cache 主要用于处理数据更新频率不高的查询,它通过缓存第一次查询返回的结果集,以便在之后对相同数据执行相同查询时能够立即返回结果,从而提高查询效率。
比如我们有个需求是每隔十秒获取一次销量前 5 的产品,通过以下 sql 执行查询:
SELECT product, count(product) AS sales_count
FROM sales_log
GROUP BY product
ORDER BY sales_count DESC
LIMIT 5;
复制代码
在没有 cache 的情况下,每次都需要执行完整的 sql 查询流程,而整个流程可能耗时比较久,但结果仅仅返回 5 条数据。如果 sales_count 表中的数据更新频率不高,那么通过 cache 可以立即返回之后查询的结果,大大降低了等待时间和 Server 的负载。
整体设计
Query Result Cache 的生命周期
每个被缓存的结果集都会设置一个缓存失效时间(TTL),每次对相同缓存结果集的访问都会刷新失效时间,缓存的默认失效时间为 300 秒,可以通过设置 query_result_cache_ttl_secs
来修改。当失效时间到达后,缓存的结果集将不再可用。
除了 TTL 之外,如果底层数据(如 snapshot id、segment id、partition location)发生变化,缓存就会变得不准确。但是,这种底层数据的修改不会影响缓存的效果。如果仍然希望快速返回结果集,可以通过设置 SET query_result_cache_allow_inconsistent=1
来允许返回不一致的结果。如果您对 Databend 底层存储结构感兴趣,可以参考 Databend 存储概览
缓存结果存储
Databend 使用键值对来存储查询结果集,对于每一次查询, Databend 根据 query 信息构造一个对应的 key,然后将查询结果集的一些元信息构造成 value 存入到 meta service 中。
其中 Key 的生成规则为:
// 将 ast 序列化为 string,然后通过 hash 函数拿到对应的 hash 值
let ast_hash = sha256(formatted_ast);
// 将 result cache 的前缀,当前租户和上面生成的 hash 值拼接,得到最终 key
let key = format!("{RESULT_CACHE_PREFIX}/{tenant}/{ast_hash}");
复制代码
Value 的结构如下(注意:value 中只存储对应结果集的元信息,真正的结果集会写到当前使用的 storage 中,比如 local fs, s3...):
struct ResultCacheValue {
/// 原始查询 SQL.
pub sql: String,
/// 该次查询的 query_id
pub query_id: String,
/// 查询持续时间.
pub query_time: u64,
/// 缓存失效时间
pub ttl: u64,
/// 结果集大小,单位:字节
pub result_size: usize,
/// 结果集一共包含多少行数据
pub num_rows: usize,
/// 查询命中的 partitions 的 hash 值,每个表一个 hash 值
pub partitions_shas: Vec<String>,
/// 结果集缓存文件在底层存储中的地址
pub location: String,
}
复制代码
读取 cache
读 cache 流程比较简单,通过以下伪代码说明:
// 通过格式化之后的 ast 来生成查询语句对应的 key
let key = gen_result_cache_key(formatted_ast);
// 构建 cache reader
let cache_reader = ResultCacheReader::create(ctx, key, meta_client, allow_inconsistent);
// cache reader 首先从 meta service 中通过 key 得到对应的 ResultCacheValue
// ResultCacheValue 的结构见之前的代码段
let value = cache_reader.get(key)
// 如果可以容忍不一致,或者查询覆盖的 partitions 的 hash 值相同
// 就会通过 location 去底层存储读取缓存结果集,然后返回。
if allow_inconsistent || value.partitions_shas == ctx.partitions_shas {
read_result_from_cache(&value.location)
}
复制代码
写入 cache
┌─────────┐ 1 ┌─────────┐ 1
│ ├───►│ ├───►Dummy───►Downstream
Upstream──►│Duplicate│ 2 │ │ 3
│ ├───►│ ├───►Dummy───►Downstream
└─────────┘ │ │
│ Shuffle │
┌─────────┐ 3 │ │ 2 ┌─────────┐
│ ├───►│ ├───►│ Write │
Upstream──►│Duplicate│ 4 │ │ 4 │ Result │
│ ├───►│ ├───►│ Cache │
└─────────┘ └─────────┘ └─────────┘
复制代码
写 cache 的主要流程如上图所示,当一个查询执行没有命中 cache 时,就会触发写 cache 流程。
Databend 使用 pipeline 方式调度和处理读写任务,通常的 pipeline 流程是 source -> transform -> transform .. -> sink
, 写 cache 会增加一个 sink 出口,因此需要首先并行的加一条管道来复制上游数据 (图中 duplicate 部分)
而由于 pipeline 中前置节点的 output port
和后置节点的 input port
是一一对应的,所以这里我们通过 shuffle 来重排序,以此来衔接前后处理节点。
注意事项
如果 query 中使用了不确定性的函数,比如 now()
, rand()
, uuid()
,那么结果集将不会被 cache,另外 system 下的表也不会被 cache。
另外目前结果集最大缓存 1MiB 的数据,可以通过设置 query_result_cache_max_bytes
来调整允许 cache 的大小。
使用方式
相关设置
// 进行如下设置开启 query result cache,
// 后续 databend 将会默认打开这个设置
SET enable_query_result_cache=1;
// 进行如下设置来容忍不准确的结果
SET query_result_cache_allow_inconsistent=1;
复制代码
测试 cache 是否生效
SET enable_query_result_cache=1;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 |
| 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 |
| 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 |
| 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 |
| 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 |
| 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 |
| 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 |
| 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 |
| 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 |
| 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (3.255 sec)
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 |
| 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 |
| 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 |
| 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 |
| 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 |
| 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 |
| 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 |
| 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 |
| 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 |
| 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (0.066 sec)
复制代码
可以看到,相同的查询,第二次的结果是立即返回的。
RESULT_SCAN
Query result cache 同时提供了 RESULT_SCAN
的 table function,在同一个 session 中,可以快速根据 query_id 来拿到之前查询的结果,使用方式可以参考文档。
另外用户可以通过 SELECT * from system.query_cache
来获取当前租户下被 cache 所有结果集的元信息,包括
未来规划
对以上改进感兴趣的同学欢迎为 Databend 添砖加瓦。
致谢
Databend 结果集缓存的设计参考了 ClickHouse 和 Snowflake,如果想进一步跟进 query result cache 的细节,请参考以下链接:
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
Databend Cloud:https://databend.cn
Databend 文档:https://databend.rs/
Wechat:Databend
GitHub:https://github.com/datafuselabs/databend
评论