Flink 流批一体在小米的实践
摘要:本文整理自小米软件开发工程师金风在 Flink Forward Asia 2021 流批一体专场的演讲。本篇内容主要分为三个部分:
小米的大数据发展演变
流批一体的平台建设
流批一体应用场景
未来规划
一、小米的大数据发展演变
2019 年之前,小米的实时计算主要以 SparkStreaming 为主,少部分 Storm,离线计算以 Spark 为主。
2019 年,开始接入 Flink,并广泛应用于信息流搜索推荐、广告实时样本、实时 ETL 等场景,逐步替换了原来的 SparkStreaming 作业,得益于 Flink 框架的各种优秀特性,我们在作业的正确性,实时性,资源使用效率方面都有较大提升。
2020 年,开始接入使用 FlinkSQL,并广泛用于实时数仓的建设和实时 ETL 作业的开发。FlinkSQL 的实时数仓将数据链路由 T+1 降低到了秒级。
2021 年,开始接入数据湖 Iceberg,基于 Flink 和 Iceberg 来构建流批一体的实时数仓解决方案,并在小米内部的部分业务进行了落地,证明流批一体在赋能业务、提升作业开发效率、简化链路节省资源的方面是可行的。
上图是小米当前的实时和离线框架,目前是多种框架并存的状态。业务开发人员无论是写 SQL 作业还是写 Jar 包作业,都至少要维护两套代码。公司内部的计算引擎团队也需要花两拨人力分别去维护不同的计算框架,同时平台层也需要对不同的计算引擎去做不同的适配。
基于流批一体的改造,无论是实时还是离线都只需要维护一套计算框架,为业务开发人员、平台提供方和计算引擎的支持方节省了一半的人力资源。
二、流批一体的平台建设
为了探索流批一体,我们也做了很多相关的探索和实践。
对于流批一体的平台化建设,主要分为 4 个方面,分别是元数据管理、权限管理、作业调度以及 Flink 的生态建设。
2.1 元数据管理
我们基于 Metacat 做了统一的元数据管理,由 Metacat 统一对接下游不同的存储系统和上游的计算引擎。
基于 Metacat,内部的所有系统都被统一划分成三级结构,与 FlinkSQL 的三级结构相对应。
第一级 Catalog,主要由服务名和集群名拼接而成。
第二级 Database,它与大部分系统的 Database 保持一致。没有 Database 的系统默认使用 default 来代替。
第三级 Table,也与系统的 Table 保持一致,比如消息队列的 topic 名, Elasticsearch 的索引名。
在构建好统一的元数据管理之后,只需要写一条 DML 语句即可完成一个实时将消息队列数据入湖作业的开发。
2.2 权限管理
在实现了统一的元数据管理后,在开发 Flink SQL 作业时, 所有系统都被抽象为一个三级的结构表, 业务可以使用三级表名引用任意一个系统的表。同时我们基于 Ranger 也做了统一的权限管理,在 SQL 层统一管理所有的资源权限。
我们在计算引擎层做了统一的权限管理,同时覆盖了 Flink SQL 和 Flink Jar。Flink SQL 作业可以在生成物理执行计划时获取到 SQL 引用的的 Source 和 Sink 表,以及 select 的 Source 表的字段名。基于以上信息,即可实现字段级别的鉴权。同时我们为 Flink Jar 用户提供了统一的工具类,同时也对接了 Flink Catalog,因此可以做到 Jar 包作业的权限校验。
如上图所示,统一管理了元数据和权限之后,业务开发人员在开发 Flink SQL 作业时就可以非常方便地选择不同系统的表,包括 Doris、Kudu、Hive 等,作业由后端统一进行提交并鉴权。在作业提交时,我们也能非常方便得获取到作业的血缘。
2.3 作业调度
在作业调度方面小米也做了一些尝试。如上图左边这段 SQL,在离线调度模式下它是一个批作业,但在实时调度下它就是一个流作业。在批流混合的调度下,会先启动批作业,执行完成之后再启动流作业。
批流混合对于调度器来说是实时的作业。我们主要的改动是在 Flink SQL 的模板作业中先启动一个 SQL 的批作业,执行完成之后再启动 Flink SQL 的实时作业。
2.4 Flink 的生态建设
Flink 插件化的 connector 设计可以非常方便地拓展不同的 connector。无论是 Flink 官方还是其他社区,都提供了非常多的 connector 支持。小米内部也实现了很多种类 connector,只有完善了 Flink 的生态建设,它跨平台设计的计算能力才能真正体现出来。
对于 Iceberg connector,社区已经实现了批量读写和流式入湖的相关功能。另外流式消费也是一项比较重要的功能,如果不支持流式消费,在数仓的链路中就只能改造 ODS 层,下游链路只能以批的方式来处理,无法做到全链路真正的实时处理。因此支持 Iceberg 的增量消费是实时链路中必不可少的一环。
对于流批一体生态比较重要的还有 Hybrid Source 和 CDC Sink。
Hybrid Source 在社区已经有相关的实现,它能够将两种不同的 Source 进行组合,大多数组合是有限流 + 无限流,这样就能做到批流混合。
小米在平台层已经统一管理了所有系统的表,因此在实现 Hybrid Source 时就无需填写对应表的结构信息和比较繁琐的参数信息,只需要按照顺序将需要读的表名配置在参数中即可,Hybrid Source 会按照配置的顺序,依次读取需要的表。此外,还可以进行不同系统的组合,比较常用的是 MySQL 和消息队列的组合,先全量消费 MySQL 中的数据,再增量消费消息队列的数据。
CDC Sink 主要配合 Hybrid Source 来使用。CDC Sink 同样也对接了内部的 Catalog,它统一管理了 Schema 的变更操作。数据到达下游的 connector 时不需要再去处理繁琐的 Schema 变更逻辑,只需要将真实的数据以真实的 Schema 写入对应的系统即可。
无论是 Hybrid Source 还是 CDC Sink,在 Flink 框架层的字段类型都有一个 barrier 字段,它可以封装任意结构的数据,也可以做 Schema 变更。但是一些字段类型不匹配的情况,只有在运行时才会暴露出来。
三、流批一体应用场景
大多公司都有数据导入和导出的需求,基于 Flink 丰富的生态,我们可以非常方便地实现不同场景的数据集成,主要包括离线集成、实时集成以及批流混合数据集成。
首先是离线的数据集成。我们使用 Flink SQL Batch 作业替换了之前的 Data X,借助 Flink 的生态,可以非常方便地实现不同系统数据导入导出的需求,也获得了更丰富的 Source Sink 生态。同时还基于 Flink SQL 可以非常方便地实现字段的映射,同时 Flink SQL 作为分布式框架,可以很方便提供并发导数的需求。
其次是实时数据集成,主要分为两个部分:
第一部分是实时数据的收集,小米内部主要分为两大类, 分别是日志数据和 DB 的 Binlog 数据。 这里主要介绍 DB 系统的 Binlog 数据收集。最初我们使用小米自研的 LCS Binlog 服务来进行统一的 Binlog 收集,类似于 Canal 服务,通过该服务将 Binlog 的数据统一收集到消息队列中。
第二部分则是数据的转储, 将使用 Spark Streaming 任务将消息队列中的数据导入其他系统,比如 Kudu 或 HDFS。
现在我们使用 Flink 对 Binlog 的收集和转储链路都进行了改造。使用 Flink CDC 收集 Binlog 数据,并写入消息队列中。同时通过 Flink 将消息队列的数据转储到其他系统,比如 Kudu、Doris、Iceberg 等等。
在实际的使用中往往需要用流批混合的方式,以适用不同的场景, 比如分库分表场景, 部分链路重做场景,新增库表等场景。当前,使用 Flink CDC 任务来收集库级别的 Binlog 数据,如果按照表级别来进行收集,会对 MySQL 服务造成较大的压力。将数据收集到消息队列后,再针对不同的收集场景,起不同的作业来进行转储。
Flink CDC 收集的数据是库级别,当某张表的数据需要重做时,无法将库级别的数据重做,因为这样会影响到其他的表。所以,对于单表的全量数据,我们会直接从 MySQL 读取,再从消息队列中读取增量的数据,因此这里需要用 Hybrid Source 分别读取 MySQL 和消息队列中的数据。
另一种批流混合的数据集成场景则是批作业和流作业混合使用。
在支持 TiDB 的数据收集和转储时,我们无法使用 Hybrid Source,因为 TiDB 的全量数据往往非常大,我们需要起大量并发能够加速全量数据的转储,而增量数据则只需要较小并发即可。在全量数据部分我们使用 Flink SQL Batch 作业来完成, 可以灵活调整并发,且相对于实时作业处理效率更高,增量部分则以较小的并发能转储即可。
另外一个比较重要的业务场景是实时数仓的构建。在小米内部也经历了从传统离线数仓到 Lambda 架构再到当前基于数据湖的实时实仓。这三种场景有不同的优缺点,会应用于不同的业务场景。有两个比较典型的案例,分别是小米手机激活统计以及小米销售服务实时数仓。
上图是小米手机激活业务流程,首先是激活数据的收集,通过不同的渠道来收集日志,并进行统一的汇总和清洗。通过采集数据配合维表 join 能够检测到提前激活的 case,同时也能够基于一些维度数据进行数据清洗,判断出哪些属于自然激活,哪些属于正常的活跃日志。
小米手机激活数仓的整体架构涉及到实时链路和离线链路,这里主要介绍实时链路。我们采用的维度表主要是 HBase 和 FileSystem,HBase 用于保存历史全量的唯一 ID,Hive 主要保存少量的维度数据,最终的结果会实时落到 Kudu 中,业务就可以通过 OLAP 引擎查到实时的激活数据。同时离线链路也是必不可少的,实时和离线产生的数据整体的重合率达到 99.94%。
上述链路中最关键的点是需要使用 HBase 来保存历史的全量 ID 来去重。这其中 join 历史全量数据 HBase 表是最关键的地方,最开始我们使用同步的 lookup join 方式,但是遇到了较大的性能瓶颈,后改用异步 join 方式,最终整体的处理速度有了数十倍的提升。
小米的销售服务涉及到多个模块,包括订单、物流、商品、售后门店,在构建过程中我们也遇到了非常多的问题,最终证明基于 Flink SQL 改造离线链路构建实时数仓的方案是可行的。
上图是销售服务数仓的整体架构,销售服务用到的维度表主要来自于消息队列和 FileSystem。在销售服务的场景下,无论是订单表还是商品类目表都会实时更新。进行关联时,无论哪一条流有更新,结果都需要更新。因此销售的服务数仓大多采用双流 join,而双流 join 伴随而来的就是状态问题。
在 Flink SQL 中我们无法准确控制某个算子的状态过期的策略,因此只能设置一个统一的状态过期时间,如果一段时间内某些状态没有被访问则会被清理,但这个场景是有局限性的,针对于物流售后这些场景,单条记录在整个实时流中的周期可能会超过一个月,但一般情况下我们无法将 Flink 作业的状态超时时间设置为一个月,产生的状态量太多会导致处理效率变低,也不利于作业链路的回溯。一旦作业出现问题,上游的数据都需要重做,消息队列中大多数据保存不会超过 7 天。
在销售服务的某些场景中中,我们引入了离线作业,根据结果表的数据,获取状态还未结束的数据,将对应的维度数据写回到实时流中, 确保这些维度数据不会过期, 当主表的数据到达 join 算子,就可以得到正确的数据, 即使主表的某条记录超过一个月才出现变更。
上图是小米 APP 近实时数仓的架构。通过采集日志模块将日志统一收集到消息队列中。由于是日志数据,只需要使用到 Iceberg v1 表,中间的 DWD 和 DM 层都使用 Flink SQL 进行实时消费和处理,然后写入到 Iceberg v2 表中。
对于有些需要保存历史的全量数据并保证数据准确性,但对时效性要求不高的数据表,我们采用离线的处理思路,基于 DWD 层进行离线处理。每天使用 t-1 的数据去修正 t-2 的数据,通过不断修正历史数据,可以很大程度地保证这些表的准确率。
对于一些旧的链路或者上游数据由其他业务方提供,且短期内无法修改且无法产生 CDC 数据的系统,我们采用了 Spark Merge Into 作业定时调度来产生增量的数据,并实时写入到 Iceberg v2 表中。在生产实践中,通过 Merge Into 产生的数据延迟大概是 5~8 分钟,后面链路的延迟都可以控制在分钟级以内,因此全链路的延迟基本可以控制在 10 分钟之内,相比之前 t+1 的延迟有了巨大的提升。
对于能够从源头产生 CDC 数据的系统,我们会将实时数据写入到消息队列中,然后实时入湖。整体架构如上图所示,实时部分主要用使用 Flink 和消息队列的组合来达到秒级的延迟,再将消息队列的数据 Sink 到 Iceberg 中,用于 Query 查询。同时下游有一条离线链路不断通过 Merge Into 进行修正,保证 Iceberg 中结果的正确性。
整个链路基于消息队列保证了全链路的时效性,同时基于数据湖保证了 Query 的查询时效,另外通过离线的 Merge Into 不断地修正,保证了最终结果的准确性。
虽然数仓的架构整体都基本类似,但是针对不同业务场景及不同的需求,实际的链路复杂度还是有较大的区别。因此在使用 Flink 构建数仓时,一定要结合实际的需求选择合理的方案。
四、未来规划
当前 Flink + Iceberg 的数据湖解决方案在小米已经初步落地,未来我们可以提升的空间依然非常大,我们会不断跟进社区,继续推进小米内部流批一体化的建设。
后续我们会将 Flink SQL Batch 用于更加复杂的场景。当前 Flink SQL Batch 发挥的场景有限,主要运用于批量导出的场景,相信未来它会发挥更大的价值。
其次,我们会跟进社区的 built in dynamic table <sup>[1]</sup>,结合消息队列和数据湖,兼顾时效性和准确性,提升用户的体验。
同时我们也会升级 Hybrid Source connector,当前社区的 Hybrid Source 是适配最新版的 Source 接口,新版的 Source 接口在对接其他系统时灵活度更高。
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群第一时间获取最新技术文章和社区动态,请关注公众号~
版权声明: 本文为 InfoQ 作者【Apache Flink】的原创文章。
原文链接:【http://xie.infoq.cn/article/834a7c605333f12b080c53dda】。文章转载请联系作者。
评论