「零售数据通道」数据炼金术:千亿级流量资产湖仓架构转型
作者:京东零售 陈美航
0 前言
在流量领域的转化分析、搜索推广算法及 AI 等数据分析应用场景中,流量资产的质量直接影响到业务的监测和运营。作为流量资产的基石,流量数仓在应对快速变化和多样化的业务需求时,如何在提高效率、优化用户体验和控制成本方面做到最佳?本文将方案设计、链路优化、湖架构新特性研发、大促保障及监控设计等多角度全方位介绍湖仓一体技术在流量资产的探索和实践。
1 转型前,流量数仓有哪些痛点?
1.1 架构负担重
原流量数仓采用典型的 Lambda 架构,虽然能够给用户提供离/在线两种时效数据,但长期以来存在离线、实时数据对不齐,多种计算组件和架构范式导致的成本负担高等问题,用户很难专注于解决业务问题实现价值。
从成本上看,
•离线、实时链路相互独立运转,计算资源 double,增加成本约 100W/年;
•离线链路为优化时效,采用小时级加工方式代替原 T+1 批处理,但确引入导数据冗余存储及同一份数据多次 copy 问题,存储及计算成本增加近 40W/年;
从效率上看,资产模型的开发迭代需要在离线和实时研发人员间拉通对齐,整体效率不高;
从体验上看,因离线和实时链路计算 &存储异构,常常因为数据间有 gap,或者表/流对不齐,需要下游用户花费更多的时间和精力排查解决,使用体验差;
1.2 模型割裂
不同类的用户交互行为模型,如曝光、点击、浏览、订单等,又按照埋点上报端来划分为主站 APP、M、PC、WEIXIN 及各垂站 APP 等,再考虑实时和离线,同一类的交互模型将衍生出 10 余张表或流模型,共 40+模型,这将带来很多问题。
体验差,效率低:
•完整的用户行为分析需要接入以上全部模型,接入时和后期迭代运维工作量巨大;
•各模型间字段命名不统一,列对不齐,各端上报的埋点规则也不一致,取数开发成本非常高;
冗余计算,成本高:
•业务使用的字段需要从诸如 json_param,event_param 等扩展列取值,下游数千用户重复计算提取;
1.3 时效性不完备
仅有流式秒级和 T+1 天级时效数据,没有提供近实时数据能力,考虑到秒级数据的成本,一些用户只能选择使用 T+1 数据。流量数仓 GDM 层 T+1 时效具备时间日常一般为凌晨 2 点,下游接入后,在面对内部复杂业务场景时,计算处理时间捉襟见肘,数据 SLA 难以保障,大促或活动流量高峰时,影响更为明显。
2 为什么选择湖仓框架来解决以上问题?
湖仓架构(Delta Lake, Hudi 等)具有许多诸如支持更新、存算分离、支持非结构化和结构化多种数据类型等特性,但应用到流量资产场景时,我们最关注以下四个关键特性:
•端到端分钟级能力;
•支持事务,保障数据在多方并发读写时的一致性,写入任务异常时的回滚清理;
•MVCC 功能,提供轻量级时间旅行能力,在处理小文件问题时尤为重要;
•数据重新布局能力,有效降低存储和查询成本;
3 湖仓链路架构设计
模型精简统一:曝光、点击、浏览、订单等不同用户交互行为内的各端及实时、离线模型,数据统一,模型名称统一,字段名称统一,由之前 40+模型简化为 4 个逻辑模型,8 个实体模型;
标准且易用:将数据 ETL 过程封装成埋点解析 SDK,对埋点上报数据进行统一解析,形成含义清晰的标准字段(例如 shp -> shop_id , rtm -> receive_ts 等),埋点上报的复杂字段及解析逻辑对下游用户透明;
数据语义保证:通过 Flink 任务集成解析 SDK,将原始数据加工成最终模型,并事务生产到 JDQ,同时下游通过 Flink 任务写入 Hudi 表中,保证链路处理语义为 Exactly-Once;
优化效率及成本:Hudi 表按照业务域分区,方便下游裁剪数据按需取数;数据分钟级可见,存算成本需要比原表低;
容灾:设计冷备批修复链路,应对所有可能出现异常导致的数据不可用情况。
新链路架构图如下:
4 转型的挑战与优化
在流量资产湖仓一体化转型的过程中,也遇到了很多的挑战,平台侧基于开源 Hudi,在底层引擎做了大量的优化,数资团队也与数据湖平台团队展开紧密联动,“遇山开路,逢水搭桥”!下面着重介绍下在实际落地过程中,都遇到了哪些挑战,以及是如何解决的
4.1 数据湖团队 -- 从底层引擎的视角,面临的挑战与优化
4.1.1 数据湖多模态 IO 能力
挑战:京东某些 HDFS 集群在热点时段繁忙度较高。此时 Flink 流写会出现 timeout 重试甚至异常重启,导致写入性能均值吞吐受限,且扩 Flink 资源对性能提升效果不明显
解决方案:自研湖表多模态 IO 能力,将湖表物理存储划分为缓冲层和持久层,流量数据先写高性能缓冲层,极大提升写入性能与任务稳定性,且吞吐能够横向扩展。
开源 Hudi 版本,其元数据和物理存储是一对一的关系,数据湖团队创新性的设计了多模态 IO 的能力,结合 Hudi 自身元数据,构建统一的 IO 抽象层与 Hudi 逻辑表视图能力,实现湖表分层存储。对底层物理存储而言,以可插拔的方式集成不同的存储介质,例如 HDFS、OSS、Kafka、HBase 和 Redis 等,再结合 Hudi 元数据,构建统一的 Hudi 逻辑表视图,对外暴露逻辑表,实现对查询、写入端的透明;不同存储介质间,扩展 Hudi 表服务能力,即 Clustering/Compaction 实现在完成常规湖表文件整理工作的同时,同步完成不同物理介质间的数据“搬运”工作。
湖表数据缓冲层,即为湖表多模态 IO 的一个具体的落地场景,将湖表物理存储划分为以高性能 HDFS 为基础的数据缓冲层和以共享 HDFS 为基础的数据持久层,热数据先写入到缓冲层,后续通过 Compaction 和 Clustering 等表服务搬运至持久层,以满足海量数据入湖场景下对于性能和稳定性的强烈诉求。这里需要注意的是,数据写入缓冲层并 commit 后即对下游可见,无论后续“搬运工作”何时开始与结束都不影响数据可见性与时效,另外因为复用了湖表自身的 Compaction 和 Clustering 表服务能力,来实现缓冲层到持久层的数据搬运工作,该行为同样遵循湖表的 commit 机制与 MVCC 快照隔离设计,因此湖表开启数据缓冲层后,同样具有原子性、事务性保障以及 Exactly Once 语义。
下面是一个普通湖表读写流程与开启湖表缓冲层功能后的湖表读写流程之间的对比
开启湖表多模态 IO 能力,后 Hudi 写入吞吐提升 100%,CP 时长减少 95%,稳定性提升 97%
4.1.2 湖表动态分区策略
挑战:业务分区倾斜严重,最大分区与最小分区数据量相差 730 万倍,需解决数据倾斜导致的写入性能和小文件问题
解决方案:在 Flink 流写 Hudi 的核心拓扑链路上,新增一个自定义 Partitioner 策略,根据湖表数据特征来路由数据,从而缓解数据倾斜,控制文件数量
4.1.3 跨引擎无锁并发 Append
挑战:随着流量数据量的持续提升以及大促场景下的骤增,受 Flink 机房资源限制,单个 Flink 集群扩容出现瓶颈,此时需要将任务拆分多个,并发 Append 同一张流量湖表。
解决方案:解决元数据冲突、JM 与 TM 通信冲突、以及 Hive 元数据同步的冲突,实现轻量级的无锁并发 Append,解决扩展性问题。
4.1.4 数据湖元数据 Timeline 优化
挑战:对于历史数据回溯入湖等批量操作,会提交“多且大”的元数据信息,从而影响正常元数据即 Timeline 访问的性能。
解决方案:针对通过湖表 Replace-Commit 的方式,来进行历史数据回溯,并在元数据访时,增加智能跳过、懒加载等机制,从而提升元数据访问性能。
4.1.5 异步增量排序 Clustering
实时写入的数据文件通常较小且无序,数据湖团队扩展社区 Clustering 的能力,实现数据增量排序、合并,从而在合并小文件的同事,提升数据压缩比。在增量排序后,存储能够节省 60%!
4.2 数资团队 -- 从业务应用所做的优化及保障
流量资产是零售数据量最大主题,没有之一,日常千亿级别的量级,大促期间更是成倍上涨,由此带来整条数据链路上各个环节的架构组件都需要承受更大的计算压力和网络带宽冲击,以下通过链路性能优化、稳定性优化及监控等多方面介绍数资团队在转型过程中所做工作。
4.2.1 链路性能优化
4.2.1.1 反序列化优化
新模型仍旧以 JSON 为数据格式,随着比旧模型多了 120+字段,行级的反序列化成本很高,尤其数据量级增大之后,往往成为整个任务的性能瓶颈。
为了处理好反序列化(Deser)的开销,我们做了两项工作:
• Flink 流式处理过程中消息的 Deser 操作一般在 Source 算子中执行,该算子决定了整个任务的处理性能上限,受限于 Flink Source Subtask 和 JDQ 分区 1 对 N 的关系,无法通过扩大 Source 算子的并行度来提升性能,因此我们换了一种思路,将 Deser 操作从 Source 算子中抽出来,Source 算子只负责从 JDQ 拉取字节级的消息数据,通过注册一个专门 Deser 的 Lookup Table(为什么使用 Lookup Table? Lookup Table 可直接返回 RowData 类型,可直接用于入湖 Sink 算子,避免其他转换开销)来执行 Deser 操作,该 Lookup 算子并行度可按性能要求调整。
•针对曝光数据,受限于 Flink 单任务瓶颈,我们采用拆任务方式来提高入湖性能,但拆分需要根据消息所属业务域来进行,为避免拆分后的任务仍需要序列化整体数据,我们将消息业务域字段写入 JDQ 消息 Headers 中,拆分后的任务只需要解析 Headers 来过滤掉不需要的业务域数据,而不是通过 Deser 整个消息后来过滤。
4.2.1.2 写入性能优化
处理分区倾斜处理,按业务域分区后,曝光模型有些分区(S 和 Y 分区)的倾斜度高达 730 万倍
如果不做任何处理直接入湖,入湖后的文件数将会是
File num = Sink 并行度 * 表分区数
例如表分区数为 54,Sink 并行度 2000,那么一次 commit 产生的文件数至少为 108000 个,数据量最小的 Y 分区单个文件大小为 KB 级别,会形成非常多的小文件,无论是对 NS 还是后续 Clustering 操作都形成巨大压力,单个 subtask 要写 54 个分区的文件,在多个分区文件句柄间切换也非常耗性能。
优化:根据各个分区的大小来设计动态分区策略,数据量较大的分区,比如 S,H,P,单独路由到特定的 subtask 集合,集合的大小根据该分区对应的数据量大小确定,S 分区占据了近一半的数据,因此它的 subtask 集合占据了 Sink 算子并行度的一半,而其他非常小的分区则三两成队路由到单独的 subtask(尽量减少单个 subtask 写分区数量,减少写入句柄切换)。按此方式优化后,一次 commit 写入的文件约为 6000+,减少了 94% ,写入性能提升了 1 倍多。
流写 HDFS 优化,HDFS 的读写参数默认为批读批写服务,比如默认写 parquet block size 为 128M,但遇到流写场景,block size 过大会影响流式写入性能,因此将此参数 write.parquet.block.size 调整至 10~20M,形成微块写入,性能有显著提升,提升将近 1 倍,但也要考虑因此带来的下游读数性能下降,目前可通过 Clustering 来规避该问题。
4.2.2 链路稳定优化
4.2.2.1 JDQ 集群稳定优化
挑战: 因曝光数据量过大,导致下游接入业务消费速度无法跟上生产,进而大量冷读磁盘数据导致 JDQ 集群磁盘繁忙引起整体性能抖动,转而影响正常消费的入湖任务,导致入湖时效延迟。
优化方案: 合理限制曝光流生产速度,并将下游按重要等级划分,依据不同等级限速,保障 JDQ 集群以合理的负载扛过大促洪峰阶段,待消峰后再逐步放开限速。
4.2.2.2 Clustering 稳定优化
挑战: 异步增量 Clustering 需要做分区内全局数据排序,期间会产生大量 Shuffle 操作,日常会借助 Spark RSS 服务来稳定 Shuffle 操作,但大促洪峰时大量频繁的 Shuffle 即使使用 RSS 服务也会有 Fetch Failed 问题,甚至会对 RSS 服务造成冲击。
优化方案:平台侧研发了 Clustering 关闭排序功能的动态开关,可避免 Shuffle,数资侧通过 DUCC 来控制开关,从而在大促洪峰期间可视数据量级大小判断是否关闭排序功能,防止 Clustering Fetched Failed 问题,降低 RSS 服务的风险。
4.2.3 流转批处理
挑战:在处理实时流表时,我们需要一种机制来感知流表的业务时间水位线,并判断何时 T - 1 的数据已准备就绪,以便正确启动下游的离线加工任务。
解决方案:我们通过流量湖表的流转批机制来实现这一目标。具体做法先改造 Flink Hudi Writer,使其能够将业务时间记录在元数据中,再通过流转批服务定期扫描湖表元数据中记录的业务时间水位线。当所有分区的水位线最小值都超过 T 的零点时,并且对应的 Clustering 文件合并任务已完成时,这就表示 T - 1 的数据已准备就绪,此时下游依赖 的加工任务就可以运行。
4.2.4 湖链路容灾及监控
4.2.4.1 监控
除常规的 Flink 任务、JDQ 生产消费监控外,我们额外针对 hudi 表设置了数据水位线的监控,具体方式和流转批逻辑类似,包括了流写的和异步 clustering 的水位线监控,分别对应图中的缓冲层(多模态 IO 具体实现)和持久层。
此外,Flink 入湖任务 checkpoint 间隔为 15min,如果仅使用 JDQ 消费积压监控来判断任务运行情况,则可能会出现故障延迟发现情况。例如 T1 时刻入湖作业出现故障,我们设置的报警会在 T2 时刻之后才因积压超过阈值而触发,(T1,T2)这段能及时发现问题处理问题的时间被浪费掉。
实时计算平台在 Flink 1.16 镜像提供了业务延迟监控解决了以上问题,并支持以此 metric 配置对应报警,能够第一时间发现任务异常情况。
4.2.4.2 容灾
平台侧研发了《Append 模式下的跨引擎并发写》能力,支持 COW 表纯 Append 写入模式下 Flink 和 Spark 并发写入(不同分区),利用该能力业务侧搭建了湖表冷备容灾链路,并通过只修复异常期间数据来缩短整体修数时效,以曝光数据为例,修复全天数据有之前 56 小时缩短至 23 小时。
5 收益
5.1 SLA 和数据时效提升
T+1 SLA 提升,数据量级越大 SLA 提升越明显,给下游带来的 Buffer 越大,日常时效提升如下表所示,
新增近实时数据,为兼顾性能和成本考虑,目前设定延迟为 15min,待进一步优化后有望减小至 10min 内。
5.2 存算成本节降
以数据量曝光表和点击表为例:
两个数据量级最大的表预计节约存算成本 1200+万/年。
5.3 查询成本节降
验证场景 1:约 60 个字段 length 求总和(旧模型共约 60 个字段,新模型共 180+字段)。
场景 1 结果:
验证场景 2:抽样统计从旧模型迁移到新模型的 24 个任务,对比历史(5 月)与迁移后(9 月)成本表现。
场景 2 结果:
结论:新 Hudi 表模型查询成本及耗时优于旧 Hive 表模型,主要归功于 Hudi 的数据重新排序分布带来的文件压缩以及新模型在字段预先提取,避免下游重复计算。
5.4 业务反馈
•秒送业务反馈,接入湖仓架构表后 SLA 从 11 点提前到 8 点,彻底告别大促 T+2 看数历史。
•用增业务反馈,新模型融合了所有端上数据,字段统一,使用方便。
•搜推业务反馈,SLA 相比之前至少提前 1 小时,数据链路执行时效非常稳定。
...
6 总结与展望
流量资产在湖仓一体架构的转型落地,解决流量资产在原 Lambda 架构下离在线数据对不齐,计算及存储架构范式不统一等问题,流批数据同源同模型,并额外提供了近实时增量数据,提升用户接入体验和效率,拓宽业务使用场景,目前总入湖数据量达 120PB,数资团队和数据湖团队通过一系列优化确保该链路在首次大促整体运行平稳。
6.1 大促表现
大促期间,湖仓一体流量新模型全时段平稳运行、无延迟
以数据量最大的曝光模型为例,双十一大促期间曝光总量同比增长 79.7%;其中大促晚八高潮期,曝光峰值达 7.7 亿条/min,均值达 4.5 亿条/min。
大促(11 月 12 日)SLA,相比旧模型,新模型时效更为稳定和日常差距不大。
6.2 未来规划
除了本次在流量资产应用湖仓一体的能力外,数资团队与数据湖团队还会继续合作,在以下一些场景加强建设和应用。
生产环境流读流量数据实践
如果能实现流读->入湖完整串起整条链路,则为批流一体打下基础,但目前还未在生产环境验证过流读,主要原因为 1.部分场景不具备流读能力(数据量过大处理不过来,或者上游并发写入),2.流读的窗口期,依赖上游的表配置,如何确保下游能流读完整也是要着重考虑的。
纯外键关联 PartialUpdate 能力
当前的 PartialUpdate 能力是基于主键进行局部更新,但在数仓模型开发中存在大量需要通过外键关联更新的场景, 为方便业务使用一般会在事实宽表中冗余存储维表的主键和维度列,如果维表发生变更,一般需要重刷事实表记录,而基于外键的 PartialUpdate 利用湖表主键、外键索引的能力实现高效的流式局部更新,减轻额外计算层关联开销,加速数仓外键关联场景的模型加工效率。
Hudi 秒级别实时能力
数据在入湖时,先写入数据文件和元数据,提交事务成功后数据才可见,数据文件的写入比较慢,因而一般为分钟级提交。这里可以扩展湖表多模态 IO 的能力,将数据写入到 Kafka、HBase、Redis 等高速存储中,在湖表的元数据中记录高速存储的位点、key 等元信息。在读取时再持久存储与高速存储的内容联合到一起,从而使 Hudi 实现秒级别时延。
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/61b76fc3ea638bbde1ae7f8a7】。文章转载请联系作者。
评论