大数据 -166 Apache Kylin 1.6 Streaming Cubing 实战:Kafka 到分钟级 OLAP

TL;DR
场景:电商/IoT/监控等需要分钟级指标刷新,同时保留历史明细做 OLAP。
结论:TimedJsonStreamParser + 时间分区维度 + 微批构建,稳态 3–5 分钟刷新取决于资源与合并策略。
产出:Kafka→Kylin Streaming Cube 全流程、REST/Crontab 调度与 SQL 示例,含常见坑位与修复。
版本矩阵
基本概念
实时数据更新在现代数据分析领域是一种普遍且日益增长的需求,特别是在金融交易监控、物联网设备管理和电商实时推荐等场景中。企业需要快速捕捉和分析数据趋势,才能在瞬息万变的市场环境中做出及时、正确的决策。
Apache Kylin V1.6 版本针对这一需求发布了重要的 StreamingCubing 功能扩展。该功能采用创新的架构设计,通过整合 Hadoop 生态系统的处理能力来实时消费 Kafka 消息队列中的数据。具体实现方式包括:
建立专用的 Kafka 消费者组
实现增量数据分区处理
采用微批次(mini-batch)计算模式
这种架构能够构建出支持分钟级(通常 3-5 分钟)更新的 Cube,相比传统的每日批处理模式有了质的飞跃。例如,在零售行业,商家可以实时监控各门店的销售数据变化,及时调整促销策略;在电信行业,运营商可以即时分析网络流量异常,快速定位故障点。
该功能不仅满足了企业对数据时效性的需求,还保持了 Kylin 原有的高性能 OLAP 查询能力,实现了实时分析与历史数据分析的无缝结合。
实现步骤
步骤:项目 => 定义数据源(Kafka)=> 定义 Model => 定义 Cube => Build Cube => 作业调度(频率高)
生成数据
从 Kafka 消费消息时,为确保数据分析的一致性和处理效率,每条消息需要遵循以下标准化数据结构要求:
维度信息:
包含业务实体的分类属性,用于数据分组和筛选
示例:产品类别、地区代码、用户等级等
建议采用键值对形式,如:"product_category":"electronics"
度量信息:
包含可量化的业务指标数值
示例:销售额、点击量、库存数量等
建议格式:"sales_amount":1250.50
业务时间戳:
记录业务实际发生的时间点
必须采用 ISO 8601 标准格式:YYYY-MM-DDTHH:MM:SSZ
示例:"biz_timestamp":"2023-05-15T14:30:00Z"
消息体应采用统一 JSON 结构,例如:
当前系统默认配置的分析器为org.apache.kylin.source.kafka.TimedJsonStreamParser,该解析器能够:
自动识别 JSON 消息体中的三个标准部分
将维度信息映射到 OLAP 模型的维度表
将度量信息聚合到事实表
根据业务时间戳进行时间分区处理
使用建议:
生产环境应确保所有生产者使用相同的消息模板
开发阶段可以使用 Schema Registry 进行消息结构验证
对于特殊业务场景,可通过继承 TimedJsonStreamParser 实现自定义解析逻辑
典型应用场景:
电商交易分析:维度包含商品类目、支付方式;度量包含交易金额、优惠金额
用户行为分析:维度包含用户属性、访问渠道;度量包含停留时长、点击次数
IoT 设备监控:维度包含设备类型、地理位置;度量包含温度读数、运行时长
创建数据
执行结果如下图所示:
数据采样
设置采样器
发了一大批数据,如下图所示:
检查数据
检查数据是否发送成功:
数据样例如下所示:
定义数据源
数据源选择 Add Streaming Table:
点击之后,把刚才的 JSON 填写进去,就可以解析出来:
定义 Kafka 信息,填写对应的内容,如下图所示:
可以看到我们刚才添加的内容如下图所示:
定义 Model
新建 Model,如下图所示,名称随意:
原则 DataModel,如下图所示:
选择维度 Dimension 信息:
选择度量 Measures,如下图所示:
设置 Setting 中,设置对应的 PartitionDateColumn 信息,如下图:
定义 Cube
名字随意,自己能分清就可以,如下图:
设置 Dimensions 信息如下图所示:
设置度量 Measure 信息如下图所示:
RefreshSetting 设置信息如下图所示:
设置 Aggregation Groups 信息:
RowKeys 的设置如下图所示:
StreamingCube 和 普通的 Cube 大致上一样,以下几点需要注意:
分区时间列应该 Cube 的一个 Dimension,在 SteamingOLAP 中时间总是一个查询条件,Kylin 利用它来缩小扫描分区的范围
不要使用 order time 作为 dimmension 因为它非常精细,建议使用 minute_start、hour_start 或其他,取决于用户如何查询数据
定义 year_start、quarter_start、month_start、day_start、hour_start、minute_start 或其他,取决于用户如何查询数据
在 RefreshSetting 设置中,创建更多合并的范围,如 0.5 时、4 小时、1 天、7 天,这样设置有助于控制 CubeSegment 的数量
在 RowKeys 部分,拖拽 minute_start 到最上面的位置,对于 Streaming 查询,时间条件会一直显示,将其放到前面将会缩小扫描范围。
构建 Cube
可以通过 HTTP 的方式完成构建
也可以使用 WebUI,我比较喜欢用页面来构建:
执行查询
自动构建
用 crontab 来定时任务,让其定时执行:
错误速查
其他系列
🚀 AI 篇持续更新中(长期更新)
AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI 研究-127 Qwen2.5-Omni 深解:Thinker-Talker 双核、TMRoPE 与流式语音🔗 AI模块直达链接
💻 Java 篇持续更新中(长期更新)
Java-174 FastFDS 从单机到分布式文件存储:实战与架构取舍 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务已完结,Dubbo 已完结,MySQL 已完结,MongoDB 已完结,Neo4j 已完结,FastDFS 正在更新,深入浅出助你打牢基础!🔗 Java模块直达链接
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/584a933d597922ca8a78dced2】。文章转载请联系作者。







评论