好未来 x DorisDB:全新实时数仓实践,深入释放实时数据价值
好未来(NYSE:TAL)是一家以智慧教育和开放平台为主体,以素质教育和课外辅导为载体,在全球范围内服务公办教育,助力民办教育,探索未来教育新模式的科技教育公司。截至 2020 年 11 月底,好未来在 102 个城市建立起 990 个教学点,业务范围覆盖全国 331 个地级市以及海外 20 多个国家和地区。
随着业务的发展,实时数据的分析需求日益增多,尤其在营销推荐、归因分析、业务辅助决策等场景下,实时数据分析所带来的效益提升是离线数据所不能比拟的。在这些业务场景的驱动下,好未来选择了 DorisDB 来支撑实时数据的分析应用。实现了数据秒级查询响应能力,构建了一个统一 &快速 &高效 &灵活的实时数仓。
“ 作者:王岳
好未来数据科学组负责人,专注于数仓建设、数据分析、算法等领域研究。 ”
业务背景
业务场景分类
在教育场景下,根据数据时效性划分,数据分析处理可分为离线和实时两大部分:
离线
离线数据以 8 大数据域(日志、营销、交易、服务、教学、内容、学习、画像)建设为主,主要处理核心历史数据,解决“业务运营、分析师、算法”等海量数据多维度分析和挖掘等,采用批处理的方式定时计算。
实时
实时数据分析处理,主要包括由埋点产生的各种日志数据,数据量大,以结构化或半结构化类型为主;另外还包括由业务交易产生的业务数据,通常使用数据库的 Binlog 获取。
实时数据分析的需求越来越多,特别是在营销签单业务和在读学员是否续报等场景,需要实时数据来助力业务营销付费和续费目标达成。当目标没完成时,业务运营需要对数据进行多维度分析,找到原因,并快速做出决策调整等管理动作。
业务痛点
T+1 的离线数据分析已经无法满足业务对时效性的需求,我们希望建设实时数仓来支持业务实时数据分析场景,解决如下痛点:
市场:想通过广告页投放策略,洞悉 PV、UV 等流量数据,如果出现异常,可快速分析和优化。但之前因为各种因素我们无法提供实时数据,对于业务来说 T+1 数据时效性滞后,参考价值有限。
销售:通过分析意向用户跟进和签单数据,根据当日销售目标,及时发现还有哪些管理动作需要优化。但目前是提供滞后数据,每日签多少单都通过人来统计,分析也是通过历史数据,分析效果很差。
在读学员续报:实时观测哪些学员续报了,老师需要做哪些续报动作。
课堂行为分析:分析课堂实时互动行为、答题行为等,阶段评测报告、课堂质量等。
算法模型:实时更新模型需要的特征数据,更准时的预测模型效果。
实时数仓目标
数据团队要提供灵活 &丰富的分钟级的实时数据,并要保证数据的丰富性 &准确性 &及时性等。
丰富性
沿用离线数仓建模好的数据维度和指标,保证离线能用到的,实时也能用到。
准确性
实时指标的构建需要可以保证数据完整性和准确性。所有指标开发按照指标定义文档,线上使用 DQC 平台来监控数据准确性,实时发送异常数据等。
及时性
要保证数据的“新鲜”度,线上实时产生的业务数据和日志数据,要能及时地被用于数据分析,提升一线人员或业务的反馈速度。
实时数仓技术架构演进
实时数仓的探索过程中,我们先后经历了如下几个阶段:
2018 年~2019 年,基于 Hive 框架下的小时级任务方案;
2019 年,基于 Flink+Kudu 的实时计算方案;
2020 年至今,基于 DorisDB 的实时数仓技术架构。
基于 Hive
在原有天级延迟的离线数据处理任务基础上,开发小时级延迟的数据处理链路,将核心数据按小时同步到 Hive 数仓中,每小时调度一次 DAG 任务,实现小时级任务计算。任务 DAG 示意图如下所示:
优点:
离线和小时级任务各自独立
代码逻辑复用性高,减少开发成本
可以使用离线数据覆盖小时级数据,进行数据修复
缺点:
小时级数据的延迟性还是很高,已无法满足业务对数据时效性的要求
MapRecude 不适合分钟级频次的任务调度,主要是 MapReduce 任务启动慢,另外会过高的频次会产生很多小文件,影响 HDFS 的稳定性,以及 SQL on Hadoop 系统的查询速度
批量数据处理每次运行对资源要求高,尤其是当凌晨 Hadoop 资源紧张时,任务经常无法得到调度,延迟严重
基于 Flink+Kudu
为了解决上面基于 MapReduce 小时级任务的问题,我们采用了流式处理系统 Flink 和支持增量更新的存储系统 Kudu。
如上图所示,实时的日志数据通过 Flume 采集到 Kafka,实时的业务数据通过 canal 实时同步数据库的 binlog 再转发到 Kafka 中,Flink 再实时消费 Kafka 中的数据写入 Kudu 中。
在使用 Flink+Kudu 的实践中,我们遇到了如下几个问题:
Flink 基于 stream 语义,做复杂指标计算非常复杂,门槛高,开发效率不高,数据仓库更多使用批处理 SQL
Kudu+Impala 聚合查询效率不高,查询响应时间不能满足业务多维分析要求
使用 Kudu 需要依赖 Impala、Hive 等整个 Hadoop 组件,维护成本太高
Kudu 社区不活跃,遇到问题很难找到相关解决方案,使用过程中遇到过宕机等各类疑难问题
基于 DorisDB
基于上面方案的问题,我们开始对实时数仓进行调研,包括 DorisDB、ClickHouse、Kylin 等系统,考虑到查询性能、社区发展、运维成本等多种因素,我们最后选择 DorisDB 作为我们的实时数仓,各系统的对比总结如下:
我们也深入考虑过 ClickHouse,对于教育场景,一个学员要关联的数据维度多,包括课堂、服务、订单、教研等。在每个主题我们都会建设灵活且易用的星型数据模型。当业务想进行个性化自助分析时,仅需要关联相关表即可。但如果直接构建明细大宽表,随着业务不断调整,经常需要重构开发。这种情况下,ClickHouse 的 join 能力弱,无法满足需求,而 DorisDB 强悍的 Join 能力,就成了我们应对业务变化的利器。而且 DorisDB 支持 CBO(基于成本统计的优化器),具备复杂查询的优化能力,从而可以快速的进行复杂实时微批处理任务,可以帮助我们更好的进行实时指标构建。
最终选择 DorisDB 的原因:
使用 DorisDB 可以让我们像开发离线 Hive 任务一样进行实时数仓的开发,避免了复杂的 Flink stream 语义,同时也能在功能上对齐离线指标,保证指标丰富性的基础上完成指标定义口径的一致,并且可以保证分钟级的数据可见性。
大宽表和星型模型的查询性能都很好,可以灵活高效的满足各类业务分析要求。
DorisDB 简单易用,运维管理成本低
基于 DorisDB 的实时数仓架构
系统搭建
整个系统,除了 DorisDB 集群之外,我们还搭建了下面两个配套系统
调度:使用 Airflow,进行 DAG 任务调度
监控:使用 grafana+prometheus,采集 DorisDB 信息并进行实时监控
实时数仓总体架构
基于 DorisDB 的实时数仓总体架构,主要包括下面三个部分:
数据源:业务数据(使用 Flink 实时同步 mysql 的 binlog 日志,写入到 Kafka)、日志数据(包括 H5 小程序、APP、直播 ipad 客户端等埋点采集的各类日志数据,通过 Flume 写入到 Kafka 中)
数据存储:
采用 DorisDB 的 Routine Load 直接消费 Kafka 中的日志和业务数据
使用 DorisDB 的 Broker Load 将 Hadoop 中的 DWD、DWS、ADS 等数据导入到 DorisDB 中
对于 Flink 等流式处理下系统,使用 DorisDB 的 Stream Load 方式实时将数据导入 DorisDB
数据应用:
使用 DataX 可以将 DorisDB 数据导出到 MySQL 中
使用 DorisDB 的 Export 可以将 DorisDB 中的数据导出到 HDFS 中
DorisDB 完全兼容 Mysql 协议,BI 或业务系统可以使用 Mysql Connector 直接连接 DorisDB 进行使用
实时数仓数据处理流程
在实时数仓内部,也是按照传统离线数仓的方式,对数据处理进行分层处理:
ODS 层,设置 DorisDB 的 Routine Load 间隔 30 秒消费一次 Kafka 数,写入到 ODS 表中
DWD 层,按业务分析的需要建模 DWD 表,通过 Airflow 间隔 5 分钟,将 ODS 表中过去 5 分钟的增量数据写入到 DWD 表中
DWS 层,对 DWD 表中的维度进行轻度或中度汇总,可以加快上层查询速度
BI 层,通过自研的一个指标定义工具,分析人员可以快速的基于 DWS 构建报表,也可以衍生出一些复合指标进行二次加工。分析师也可以将取数口径中的 SQL 做临时修改,生成一个复杂跨主题查询 SQL,来应对一些 Adhoc 需求场景。
DorisDB 实时数仓具体应用
在好未来,为保证课堂上课数据、订单数据的实时分析要求,使用 DorisDB 支撑了课堂、订单等分析业务。下面以课堂、订单场景为例,从数据同步、数据加工等几个步骤拆解 DorisDB 在好未来应用场景的落地方案。
实时数据同步
在好未来,采用 flink 采集业务库的 binlog 数据,然后写入到 kafka 中,DorisDB 只需要消费 kafka 对应的 topic 数据即可,整体流程如下图:
实时数仓数据处理
DorisDB 内部的实时数据加工处理主要有如下操作:
缩短计算链路的长度,实时部分最多计算 2 层。dwd 或 dws 层
增量计算,采用 DorisDB 的 UNIQUE KEY 模型,相当于(insert + update),因此只计算增量部分即可
采用时间分区,多副本策略。既为了数据安全,又能避免锁表
离线表结构与实时表结构,保持一样,这样就可以用离线修复 T + 1 数据
DAG 任务调度
为了使 DorisDB 能在 airflow 上执行,我们封装了 airflow 调用 DorisDB 执行 sql 的算子,以便 DorisDB 的加工逻辑在 airflow 上被定时调度。
DorisDB 任务执行状态的检查,由于不像 T + 1,只需要判断昨天任务是否执行就行了,实时检查需要满足以下条件:
检查轮询间隔,需要根据不同的调度间隔,适当调整
检查轮询总时长,不能超过(调度间隔时长-10 秒)
检查的范围,最小需要大于调度间隔,最大小于 2 倍的调度间隔
根据以上的实时调度检查条件,我们封装了基于 DorisDB 的实时调度的任务检查 airflow 算子,方便使用。
实时数据生产预警
为了监控 DorisDB 的实时数据生产情况,我们设置了三种预警:
1、检查 DorisDB 消费 Kafka 的任务,是否停掉了,如果停掉自动重启,重启 3 次依然失败,再发通知,人为干预
2、检查常规任务的执行,如果执行报错,就发通知。
3、检查数据源与 DorisDB 实时数仓 ods 层表,schema 的对比,如果出现 schema 变更,就发通知人为干预。这样我们就能在白天实时了解 schema 的变更情况,不必要等到调度报错才发现,而且不影响线上数据产出。
DorisDB 使用效果
提升业务收益
DorisDB 在众多场景给业务带来了直接收益,尤其是 DorisDB 的实时数据与算法模型相结合的场景。比如教育的获客、转化、用户续报等业务,之前模型需要特征数据都是前一天的,所以模型也相对滞后。而我们通过大量数据分析得出结论:是当日行为和跟进数据,是最有价值的特征数据,这样模型效果较好。特别是意向用户识别模型,成为线索当天的历史积累数据的特征和前一天的历史积累数据的特征,分别训练模型后,线上实际预测效果相差 2-3 个百分点,AUC 0.752 和 AUC 0.721 的差别,所以,当天的特征模型效果特别明显。
降低使用成本
用简单的 SQL 语义替代 Stream 语义完成实时数仓的开发,大大降低了开发的复杂度和时间成本,同时能够保证和离线指标的一致性。
结合使用宽表模型和星型模型,宽表和物化视图可以保证报表性能和并发能力,星型模型可以保证系统的查询灵活性,在一套系统中满足不同场景的分析需求。另外,明细表查询我们通过 DorisDB 外表的方式暴露查询,提升了查询的速度,大大降低了业务方的成本。DorisDB 的分布式 Join 能力非常强,原来一些需要查询多个 Index 在从内存中计算的逻辑可以直接下推到 DorisDB 中,降低了原有方案的复杂度,提升了查询服务的稳定性,加快了响应时间。
BI 报表迁移成本低,我们前期 BI 可视化是基于 Mysql 构建的,某些看板不断优化和丰富需求后,加上多维度灵活条件筛选,每次加载超级慢,业务无法接受,当同样数据同步到 DorisDB 上后,我们仅需要修改数据源链接信息,SQL 逻辑不用修改(这个超级爽,迁移成本超级低),查询性能直接提升 10 倍以上。
运维成本低,相对其他大数据组件来说,DorisDB 只需要部署一种即可满足各类数据分析需求,不需要其他软件辅助,而且部署运维简单。
未来展望
DorisDB 作为新一代 MPP 数据库的引领者,当前在多种场景下性能都非常优秀,帮助我们非常好的重构了实时数仓。目前 DorisDB 高效的支持了实时指标的计算,以及业务方在实时场景下的数据灵活探查和多维分析需求。DorisDB 在集团内部各个业务线的应用越来越多,我们也将推动实时和离线数据分析进行统一,为业务分析提供更好的支撑。后继我们将分享更多 DorisDB 的成功实践。最后,感谢鼎石科技的大力支持!
版权声明: 本文为 InfoQ 作者【DorisDB】的原创文章。
原文链接:【http://xie.infoq.cn/article/fb7ab3d647bda58ff8a35d6d1】。文章转载请联系作者。
评论