写点什么

大数据 -156 Apache Druid+Kafka 实时分析实战:JSON 拉平摄取与 SQL 指标全流程

作者:武子康
  • 2025-11-18
    山东
  • 本文字数:3778 字

    阅读完需:约 12 分钟

大数据-156 Apache Druid+Kafka 实时分析实战:JSON 拉平摄取与 SQL 指标全流程

TL;DR

  • 场景:电商点击/订单流入 Kafka,Druid 进行实时分析与可视化。

  • 结论:嵌套 JSON 需预处理拉平;Kafka Indexing Service 设“从最早位点”与禁用 Rollup 更稳。

  • 产出:Scala Producer→Druid 摄取→SQL 指标(订单/用户/支付/TopN)跑通与常见坑位清单。



整体流程

  • Kafka 数据源: Kafka 是一个分布式流处理平台,负责接收、存储并传输数据。它支持从各类应用、日志、传感器等设备采集实时数据,将数据划分为多个主题(Topic),并将消息分发给消费者。在这个案例中,Kafka 是 Druid 的数据源。

  • Kafka Producer: 数据生产者(Producer)负责将数据发送到 Kafka 的主题中。例如,应用程序可以向 Kafka 写入日志、用户行为数据、传感器数据等。每条消息可以是 JSON、Avro 等格式的数据记录。

  • Druid Kafka Ingestion: Druid 提供了对 Kafka 的原生支持。通过 Kafka Indexing Service,Druid 可以持续从 Kafka 的某个主题中消费数据,实时地将这些数据摄取到 Druid 中。摄取过程中,Druid 会将数据拆解为小的段(Segment),并将这些段存储在 Druid 集群的深度存储中(如 HDFS、S3 等)。

  • 实时数据摄取和索引: Druid 的 Kafka 摄取任务会监听 Kafka 的分区,按照流数据的到达顺序消费数据,并在内部创建索引。这些索引结构化存储了数据,并通过分片和分区机制,保证了查询的高效性和水平扩展能力。

  • Druid 查询层: Druid 提供了非常强大的查询能力,可以通过 SQL 查询方式进行交互,也支持多维查询、聚合查询等。这些查询可以是低延迟的实时查询,也可以对历史数据进行复杂的分析。用户通过 Druid 查询接口或 BI 工具(如 Apache Superset、Tableau 等)向集群发送查询。

  • Kafka 消费者 Offset 管理: Druid 使用 Kafka 消费者模型,实时消费消息并管理 Offset(偏移量),确保数据不丢失或重复摄取。Offset 会被定期提交到 Kafka 中,保证即使任务重启,摄取进度也能从上一次的位置继续。

  • 持久化和数据存储: 数据在经过摄取和索引后,Druid 会定期将数据段(Segment)持久化到深度存储中,并对旧数据进行合并和压缩,减少存储空间的占用。Druid 的集群架构支持分布式存储和查询,并能根据数据规模进行自动扩展。

案例假设

假设我们在构建一个用户行为分析系统,通过 Kafka 采集用户点击日志,并通过 Druid 实时分析用户行为。


  • Kafka 数据生产: 电商平台的应用程序会将每次用户点击产生的日志记录(例如点击商品、页面浏览等)发送到 Kafka 中的 user-clicks 主题。每条记录都包含用户 ID、商品 ID、时间戳、页面信息等。

  • Druid 数据摄取: 配置 Druid 的 Kafka Indexing Service,从 user-clicks 主题消费数据。数据会实时流入 Druid 中,Druid 将数据按照时间范围切分为段,并存储到其深度存储中。

  • 实时数据查询与分析: 业务方可以通过 SQL 查询或多维查询接口,实时分析用户的点击行为。查询的例子可能是统计每个小时的页面浏览量、分析不同商品的受欢迎程度等。这些查询可以直接反映用户的当前行为,帮助业务方做出快速决策。

  • 可视化和报表: Druid 的查询结果可以通过 Apache Superset 等工具进行可视化展示,创建实时仪表盘,展示用户行为的各种关键指标。数据分析师和运营人员可以在可视化平台上直观地看到当前系统的运营状态。

需求分析

场景分析

  • 数据量大,需要在这些数据中根据业务需要灵活查询

  • 实时性要求高

  • 数据实时的推过来,要在秒级对数据进行分析并查询出结果

数据描述

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}
复制代码


  • ts 交易时间

  • orderId 订单编号

  • userId 用户 id

  • orderStatusId 订单状态 Id

  • orderStatus 订单状态 0-11:未支付,已支付,发货中,已发货,发货失败,已退款,已关单,订单过期,订单已失效,产品已失效,代付拒绝,支付中

  • payModelId 支付方式 id

  • payMode 支付方式:0-6:微信,支付宝,信用卡,银联,货到付款,现金,其他

  • payment:支付金额

  • products:购买商品 (一个订单可能包含多个商品,这里是嵌套结构)

  • productId 商品 Id

  • productName 商品名称

  • price 单价

  • productNum 购买数量

  • categoryid 商品分类 Id

  • catname1 商品一级分类名称

  • catname2 商品二级分类名称

  • catname3 商品三级分类名称


以上的嵌套的 json 数据格式,Druid 不好处理,需要对数据进行预处理,将数据拉平,处理后的数据格式:


{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}
复制代码

Kafka 生产者

好久没用 Scala 了,用 Scala 写一个:


package icu.wzk.kafka
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializer
import java.util.Propertiesimport scala.io.BufferedSource
object KafkaProducerForDruid { def main(args: Array[String]): Unit = { val brokers = "h121.wzk.icu:9092" val topic = "druid2" val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
val producer = new KafkaProducer[String, String](prop); val source: BufferedSource = scala.io.Source.fromFile("orders1.json") val iter: Iterator[String] = source.getLines(); iter.foreach { line => val msg = new ProducerRecord[String, String](topic, line); producer.send(msg) println(msg) Thread.sleep(10) } producer.close() source.close() }}
复制代码


运行结果如下图:


Druid 导入数据

这里就不详细描述了,之前入门阶段已经走过完整的流程了:


  • JSON 数据要拉平

  • 不定义 RollUp


加载数据源:



JSON 拉平:



时间戳:



不要进行 RollUp:



最终结果如下图所示:



计算结果如下图所示:



运行测试的 SQL,一切正常!


查询计算

订单总数

-- 查询订单总数SELECT COUNT(distinct orderId) as orderscountFROM druid2
复制代码


运行结果如下图所示:


用户总数

-- 查询用户总数SELECT COUNT(distinct userId) as usercountFROM druid2
复制代码


运行结果如下图:


统计结果状态订单数

-- 统计各种订单状态的订单数SELECT orderStatus, count(*)FROM (  SELECT orderId, orderStatus  FROM druid2  GROUP BY orderId, orderStatus)GROUP BY orderStatus
复制代码


执行结果如下图所示:


统计各种支付方式的订单数

-- 统计各种支付方式订单数SELECT payMode, count(1)FROM (  SELECT orderId, payMode  FROM druid2  GROUP BY orderId, payMode)GROUP BY payMode
复制代码


执行结果如下图所示:


订单金额最大的前 10 名

-- 订单金额最大的前10名SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as productsFROM druid2GROUP BY orderId, payment
复制代码


执行结果如下图所示:


案例小节

  • 在配置摄入源时要设置为 True 从流的开始进行消费数据,否则在数据源中可能查不到数据

  • Druid 的 JOIN 能力非常有限,分组或者聚合多的场景推荐使用

  • SQL 支持能力非常受限

  • 数据的分区组织只有时间序列一种方式

错误速查

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI 研究-127 Qwen2.5-Omni 深解:Thinker-Talker 双核、TMRoPE 与流式语音🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-174 FastFDS 从单机到分布式文件存储:实战与架构取舍 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务已完结,Dubbo 已完结,MySQL 已完结,MongoDB 已完结,Neo4j 已完结,FastDFS 正在更新,深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 2025-11-18阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-156 Apache Druid+Kafka 实时分析实战:JSON 拉平摄取与 SQL 指标全流程_Java_武子康_InfoQ写作社区