写点什么

大数据 -166 Apache Kylin 1.6 Streaming Cubing 实战:Kafka 到分钟级 OLAP

作者:武子康
  • 2025-11-28
    山东
  • 本文字数:3276 字

    阅读完需:约 11 分钟

大数据-166 Apache Kylin 1.6 Streaming Cubing 实战:Kafka 到分钟级 OLAP

TL;DR

  • 场景:电商/IoT/监控等需要分钟级指标刷新,同时保留历史明细做 OLAP。

  • 结论:TimedJsonStreamParser + 时间分区维度 + 微批构建,稳态 3–5 分钟刷新取决于资源与合并策略。

  • 产出:Kafka→Kylin Streaming Cube 全流程、REST/Crontab 调度与 SQL 示例,含常见坑位与修复。


版本矩阵


基本概念

实时数据更新在现代数据分析领域是一种普遍且日益增长的需求,特别是在金融交易监控、物联网设备管理和电商实时推荐等场景中。企业需要快速捕捉和分析数据趋势,才能在瞬息万变的市场环境中做出及时、正确的决策。


Apache Kylin V1.6 版本针对这一需求发布了重要的 StreamingCubing 功能扩展。该功能采用创新的架构设计,通过整合 Hadoop 生态系统的处理能力来实时消费 Kafka 消息队列中的数据。具体实现方式包括:


  1. 建立专用的 Kafka 消费者组

  2. 实现增量数据分区处理

  3. 采用微批次(mini-batch)计算模式


这种架构能够构建出支持分钟级(通常 3-5 分钟)更新的 Cube,相比传统的每日批处理模式有了质的飞跃。例如,在零售行业,商家可以实时监控各门店的销售数据变化,及时调整促销策略;在电信行业,运营商可以即时分析网络流量异常,快速定位故障点。


该功能不仅满足了企业对数据时效性的需求,还保持了 Kylin 原有的高性能 OLAP 查询能力,实现了实时分析与历史数据分析的无缝结合。

实现步骤

步骤:项目 => 定义数据源(Kafka)=> 定义 Model => 定义 Cube => Build Cube => 作业调度(频率高)

生成数据

从 Kafka 消费消息时,为确保数据分析的一致性和处理效率,每条消息需要遵循以下标准化数据结构要求:


  1. 维度信息

  2. 包含业务实体的分类属性,用于数据分组和筛选

  3. 示例:产品类别、地区代码、用户等级等

  4. 建议采用键值对形式,如:"product_category":"electronics"

  5. 度量信息

  6. 包含可量化的业务指标数值

  7. 示例:销售额、点击量、库存数量等

  8. 建议格式:"sales_amount":1250.50

  9. 业务时间戳

  10. 记录业务实际发生的时间点

  11. 必须采用 ISO 8601 标准格式:YYYY-MM-DDTHH:MM:SSZ

  12. 示例:"biz_timestamp":"2023-05-15T14:30:00Z"


消息体应采用统一 JSON 结构,例如:


{  "dimensions": {    "region": "APAC",    "product_line": "smartphone"  },  "metrics": {    "order_count": 42,    "revenue": 12500.00  },  "timestamp": "2023-05-15T09:15:30Z"}
复制代码


当前系统默认配置的分析器为org.apache.kylin.source.kafka.TimedJsonStreamParser,该解析器能够:


  1. 自动识别 JSON 消息体中的三个标准部分

  2. 将维度信息映射到 OLAP 模型的维度表

  3. 将度量信息聚合到事实表

  4. 根据业务时间戳进行时间分区处理


使用建议:


  • 生产环境应确保所有生产者使用相同的消息模板

  • 开发阶段可以使用 Schema Registry 进行消息结构验证

  • 对于特殊业务场景,可通过继承 TimedJsonStreamParser 实现自定义解析逻辑


典型应用场景:


  1. 电商交易分析:维度包含商品类目、支付方式;度量包含交易金额、优惠金额

  2. 用户行为分析:维度包含用户属性、访问渠道;度量包含停留时长、点击次数

  3. IoT 设备监控:维度包含设备类型、地理位置;度量包含温度读数、运行时长

创建数据

# 创建名为kylin_streaming_topic的topickafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic kylin_streaming_topic1
复制代码


执行结果如下图所示:


数据采样

设置采样器


kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer --topic kylin_streaming_topic1 --broker h121.wzk.icu:9092
复制代码


发了一大批数据,如下图所示:


检查数据

检查数据是否发送成功:


kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic kylin_streaming_topic1 --from-beginning
复制代码


数据样例如下所示:


{"country":"INDIA","amount":44.47793969871658,"qty":3,"currency":"USD","order_time":1723358207350,"category":"TOY","device":"iOS","user":{"gender":"Female","id":"1c54f68e-f89a-b5d2-f802-45b60ffccf60","first_name":"unknown","age":15}}{"country":"AUSTRALIA","amount":64.86505054935878,"qty":9,"currency":"USD","order_time":1723358207361,"category":"TOY","device":"iOS","user":{"gender":"Female","id":"de11d872-e843-19c9-6b35-9263f1d1a2a1","first_name":"unknown","age":19}}{"country":"CANADA","amount":90.1591854077722,"qty":4,"currency":"USD","order_time":1723358207371,"category":"Other","device":"Andriod","user":{"gender":"Male","id":"4387ee8b-c8c1-1df4-f2ed-c4541cb97621","first_name":"unknown","age":26}}{"country":"INDIA","amount":59.17956535472526,"qty":2,"currency":"USD","order_time":1723358207381,"category":"TOY","device":"Andriod","user":{"gender":"Female","id":"d8ded433-8f1c-c6e7-99b2-854695935764","first_name":"unknown","age":11}}
复制代码

定义数据源

数据源选择 Add Streaming Table:



点击之后,把刚才的 JSON 填写进去,就可以解析出来:



定义 Kafka 信息,填写对应的内容,如下图所示:



可以看到我们刚才添加的内容如下图所示:


定义 Model

新建 Model,如下图所示,名称随意:



原则 DataModel,如下图所示:



选择维度 Dimension 信息:



选择度量 Measures,如下图所示:



设置 Setting 中,设置对应的 PartitionDateColumn 信息,如下图:


定义 Cube

名字随意,自己能分清就可以,如下图:



设置 Dimensions 信息如下图所示:



设置度量 Measure 信息如下图所示:



RefreshSetting 设置信息如下图所示:



设置 Aggregation Groups 信息:



RowKeys 的设置如下图所示:



StreamingCube 和 普通的 Cube 大致上一样,以下几点需要注意:


  • 分区时间列应该 Cube 的一个 Dimension,在 SteamingOLAP 中时间总是一个查询条件,Kylin 利用它来缩小扫描分区的范围

  • 不要使用 order time 作为 dimmension 因为它非常精细,建议使用 minute_start、hour_start 或其他,取决于用户如何查询数据

  • 定义 year_start、quarter_start、month_start、day_start、hour_start、minute_start 或其他,取决于用户如何查询数据

  • 在 RefreshSetting 设置中,创建更多合并的范围,如 0.5 时、4 小时、1 天、7 天,这样设置有助于控制 CubeSegment 的数量

  • 在 RowKeys 部分,拖拽 minute_start 到最上面的位置,对于 Streaming 查询,时间条件会一直显示,将其放到前面将会缩小扫描范围。

构建 Cube

可以通过 HTTP 的方式完成构建


curl -X PUT --user ADMIN:KYLIN -H "Content-Type:application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://h122.wzk.icu:7070/kylin/api/cubes/streaming_cube1/build2
复制代码


也可以使用 WebUI,我比较喜欢用页面来构建:


执行查询

select minute_start, count(*), sum(amount), sum(qty) from streamingds1group by minute_startorder by minute_start
复制代码

自动构建

用 crontab 来定时任务,让其定时执行:


crontab -e */20 * * * * curl -X PUT --user ADMIN:KYLIN -H "Content-Type:application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://h122.wzk.icu:7070/kylin/api/cubes/streaming_cube1/build2
复制代码

错误速查

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI 研究-127 Qwen2.5-Omni 深解:Thinker-Talker 双核、TMRoPE 与流式语音🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-174 FastFDS 从单机到分布式文件存储:实战与架构取舍 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务已完结,Dubbo 已完结,MySQL 已完结,MongoDB 已完结,Neo4j 已完结,FastDFS 正在更新,深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 刚刚阅读数: 6
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-166 Apache Kylin 1.6 Streaming Cubing 实战:Kafka 到分钟级 OLAP_Java_武子康_InfoQ写作社区