写点什么

一年省七位数,得物自建 HFDS 在 Flink Checkpoint 场景下的应用实践

作者:得物技术
  • 2023-06-29
    上海
  • 本文字数:6331 字

    阅读完需:约 21 分钟

一年省七位数,得物自建HFDS在 Flink Checkpoint 场景下的应用实践

1 背景

随着 Flink 实例的迁移下云以及新增需求接入,自建 Flink 平台规模逐渐壮大,当前总计已超 4 万核运行在自建的 K8S 集群中,然而 Flink 任务数的增加,特别是大状态任务,每次 Checkpoint 时会产生脉冲式带宽占用,峰值流量超过 100Gb/s,早期使用 OSS 作为 Checkpoint 数据存储,单个 Bucket 每 1P 数据量只有免费带宽 10Gb/s,超出部分单独计费,当前规模每月需要增加 1x w+/月。


为了控制这部分成本,得物开展了自建 HDFS 在 Flink Checkpoint 场景下的落地工作,实现年度成本节省 xxx 万元。


此次分享自建 HDFS 在实时计算 checkpoint 场景的实践经验,希望能为读者提供一些参考。

2 Flink Checkpoint 介绍

2.1 Flink 里的 Checkpoint 是什么?

Checkpoint:简单的说,在某一时刻,将 Flink 任务本地机器中存储在状态后端的状态去同步到远程文件存储系统(比如 HDFS)的过程就叫 Checkpoint。


状态后端:做状态数据持久化的工具就叫做状态后端。比如你在 Flink 中见到的 RocksDB、FileSystem 的概念就是指状态后端,再引申一下,也可以理解为:应用中有一份状态数据,把这份状态数据存储到 MySQL 中,这个 MySQL 就能叫做状态后端。

2.2 Checkpoint 解决什么问题?

其实在实时计算中的状态的功能主要体现在任务可以做到失败重启后没有数据质量、时效问题。


实时任务一般都是 7x24 小时 Long run 的,挂了之后,就会有以下两个问题,首先给一个实际场景:一个消费上游 Kafka,使用 Set 去重计算 DAU 的实时任务。


数据质量问题:当这个实时任务挂了之后恢复,Set 空了,这时候任务再继续从上次失败的 Offset 消费 Kafka 产出数据,则产出的数据就是错误数据了


数据时效问题:一个实时任务,产出的指标是有时效性(主要是时延)要求的。你可以从今天 0 点开始重新消费,但是你回溯数据也是需要时间的。举例:中午 12 点挂了,实时任务重新回溯 12 个小时的数据能在 1 分钟之内完成嘛?大多数场景下是不能的!一般都要回溯几个小时,这就是实时场景中的数据时效问题。


而 Flink 的 Checkpoint 就是把 Set 定期的存储到远程 HDFS 上,当任务挂了,我们的任务还可以从 HDFS 上面把这个数据给读回来,接着从最新的一个 Kafka Offset 继续计算就可以,这样即没有数据质量问题,也没有数据时效性问题。

2.3 Checkpoint 的运行流程?

  1. JM 定时调度 Checkpoint 的触发,接受到 JM 做 Checkpoint 的请求后,开始做本地 Checkpoint,暂停处理新流入的数据,将新数据缓存起来。

  2. 将任务的本地状态数据,复制到一个远程的持久化存储(HDFS)空间上。

  3. 继续处理新流入的数据,包括刚才缓存起来的数据。


3 自建 HDFS 引入

3.1 为什么用 HDFS?

Flink 做为一个成熟的流计算引擎,对外宣称可以实现 Exactly Once。为了实现业务上的 Exactly Once,Flink 肯定不能丢数据,也就是状态数据必须保障高可靠性,而 HDFS 作为是一个分布式文件系统,具备高容错率、高吞吐量等特性,是业界使用最广泛的开源分布式文件系统,针对大状态的 Checkpoint 任务非常契合,带宽易扩展且成本低廉。


HDFS 主要有如下几项特点:


  • 和本地文件系统一样的目录树视图

  • Append Only 的写入(不支持随机写)

  • 顺序和随机读

  • 超大数据规模

  • 易扩展,容错率高

3.2 得物自建 HDFS 架构

架构层面是典型的主从结构,架构见下图,核心思想是将文件按照固定大小进行分片存储,


  • 主节点:称为 NameNode,主要存放诸如目录树、文件分片信息、分片存放位置等元数据信息

  • 从节点:称为 DataNode,主要用来存分片数据


比如用户发出了一个 1GB 的文件写请求给 HDFS 客户端,HDFS 客户端会根据配置(默认是 128MB),对这个文件进行切分,HDFS 客户端会切分成 8 个 Block,然后询问 NameNode 应该将这些切分好的 Block 往哪几台 DataNode 上写,此后 client 端和 NameNode 分配的多个 DataNode 构成 pipeline 管道,开始以 packet 为单位向 Datanode 写数据。


4 自建 HDFS 落地实践

4.1 集群规划

早期使用 OSS 的主要瓶颈在于带宽,为了匹配将大状态的任务从 OSS 迁移到 Hdfs 带宽需求,支撑写入流量 100Gib+/s,对比 OSS 的带宽成本,结合到成本与带宽瓶颈考虑,内部大数据 d2s.5xlarge 机型做了一次性能压测,单节点吞吐能达到 12Gib/s,按 100Gib/s 预估,算上 Buffer,3 副本集群需要 xx 台机器,满足现在的带宽及写入吞吐需求,最终选择 d2s.5xlarge 类型 Ecs 机器,对应实例详情如下:



4.2 稳定性保障建设

4.2.1 Hdfs 组件指标采集

为了确保 HDFS 集群的稳定和可靠性,支撑线上实时 Flink 任务 Checkpoint,监控告警建设是必不可少的,我们通过统一的采集程序 Hadoop Exporter 将集群里各组件的 JMX 信息换为维度模型,将下述为扁平化的事实指标 Jmx 数据,转换为维度结构,比如针对 NameNode、DataNode,可以直接将指标使用预定义维度,例如:cluster、instance 等维度,并存储到 Prometheus 能够识别的指标数据,存储为一个二维字典结构,例如: _hadoop_namenode_metrics[指标分类(通常是 MBean 的名称)][指标名称]


4.2.2 指标采集架构

结合当前集群的规模,我们通过集中是 Pull 的方式采集架构,只需要启动时指定集群 Namenode 及 Jn 的 Jmx 的 url 信息,就能采集集群的所有组件的指标信息,这样当有集群扩展或变更时,会自动采集上报到 apm 里,方便运维,具体采集架构如下图:


4.2.3 监控与告警

监控:基于已采集汇报上的指标数据,目前配置了 Namenode、Datanode 组件核心指标监控大盘,包括 HDFS 节点健康状态、HDFS 服务健康状态、数据块健康状态、节点的写入吞吐量等指标。




告警:当前监控数据已完成接入公司天眼监控平台,我们将影响 hdfs 服务可用性的指标统一配置了告警模版,比如集群总的写入带宽、Callqueue 队列、DN 存活数量、集群节点基础 io 值班等,可以动态覆盖多集群,实现定制化告警,更加灵活及方便感知问题,减少故障止损时长,满足线上 HDFS 稳定性保障 SLA 目标。

4.2.4 集群快速变更能力

随着 Hdfs 集群规模的增加,在日常运维过程中,如何做到快速扩、缩容、节点重启及配置变更能力,保障集群具备快速止损的能力,我们封装了一整套 HDFS 的各组件变更能力,包括节点自动上报到 cmdb 对应应用、集群数据节点 maintenance 模式快速无影响重启、日常变配等,并集成到 ansible playbook,做到集群扩容在分钟级完成。


4.3 迁移到 HDFS 攻克难关

4.3.1 DN 心跳汇报于删除共用一把写锁问题

现象:自建 Flink 平台大部分大状态任务迁移后,自建 HDFS 集群节点整体的水位各个 ecs 的网络带宽峰值,出现偶发部分任务因 checkpiont 写入失败问题,报错信息如下:


问题定位过程:


  1. 根据客户端日志的堆栈信息,查看 Namenode 的日志找到对应的文件、块,发现了错误日志,文件块在写入成功后不能及时上报,块的状态一直处于 not COMPLETE。



这里介绍下 Hdfs 文件写入流程介绍:


  • 客户端向 datanode 写入块结束后,datanode 通过 IBR(增量块汇报)向 namenode 汇报新写入的块

  • namenode 收到汇报后更新文件的块副本数,当文件块副本数>=1 时,文件写入状态为 COMPLETE

  • 客户端写入结束后不断向 namenode 询问文件写入状态是否 COMPLETE,失败 5(默认)次后报错写入失败。


  1. 根据上述写入流程,怀疑问题出现在 IBR 阶段,查看 Namenode 监控指标,Namenode 处理块汇报平均时长<10ms,所以猜测问题出在 Datanode 端,观察发现,Datanode 偶发心跳汇报间隔>30s(正常 3s 一次),Datanode IBR 和心跳都是 BPServiceActor 线程处理,很可能是心跳阻塞了 IBR。



  1. 我们根据猜测的方向,继续定位什么原因导致心跳阻塞了 IBR 汇报,于是在每台节点上,部署了脚本(见下图),根据 Datanode 的 Jmx 指标监听本节点心跳间隔,大于 10s 时就打印 Datanode 的 Jstack。


Datanode 每个节点上的 metric 信息里包含心跳汇报间隔的数据。



  1. 分析多个 Jstack 代码(具体内容见下),可以发现 BPServiceActor 线程被 CommandProcessingThread 线程阻塞,而 CommandProcessingThread 线程在调用 invalidate()方法,而 invalidate()是在调用删除操作。


"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]   java.lang.Thread.State: WAITING (parking)        at sun.misc.Unsafe.park(Native Method)        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)        at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers: - None "Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000] java.lang.Thread.State: RUNNABLE at java.io.UnixFileSystem.getBooleanAttributes0(Native Method) at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242) at java.io.File.isDirectory(File.java:858) at java.io.File.toURI(File.java:741) at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133) at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099) at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738) at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684) at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359) at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405) at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source) at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332) at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)
Locked ownable synchronizers: - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync) - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
复制代码


结合堆栈信息定位到代码,确实发现 processCommandFromActor 方法在执行删除(调用 invalidate()方法)操作时与心跳汇报 updateActorStatesFromHeartbeat 方法共用同一把写锁。


class BPOfferService {private final Lock mWriteLock = mReadWriteLock.writeLock();void writeLock() {  mWriteLock.lock();}
void writeUnlock() { mWriteLock.unlock();}
void updateActorStatesFromHeartbeat( BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { writeLock(); try {//... 心跳汇报 } finally { writeUnlock(); }}boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException { assert bpServices.contains(actor);// ...省略 writeLock(); try {//...执行删除逻辑 } finally { writeUnlock(); }}}
复制代码


  1. 确认问题:查看 Namenode 审计日志发现,集群持续有大量文件删除(Flink 删除过期 Checkpoint meta 文件)操作,修改 Datanode 端代码,在调用 processCommandFromActive 方法超过一定 10s 后打印调用时长与 CommandAction 日志。查看 datanode 日志发现确实存在删除操作大于 30s 的情况,由此确认问题就是出现在删除操作耗时过长影响了 Datanode 的增量块汇报。



由此确定问题:


删除块操作耗时过长,阻塞 datanode 心跳,导致 IBR 被阻塞,块写入成功后不能及时上报,客户端重试一定次数后失败抛异常,重试次数由 dfs.client.block.write.locateFollowingBlock.retries 决定,默认 5 次,第一次等待 0.4s,之后每次等待时长翻倍,5 次约为 15s 左右。


问题解决方案


找到问题就是出现在 BPServiceActor 线程做了太多的事,包含 FBR、IBR、心跳汇报,而且心跳汇报和删除共同持有一把写锁,那解决方案一个就把这两把锁进行拆分,一个就是将 IBR 逻辑单独独立出来,不受心跳汇报影响。


而社区 3.4.0 版本已经将 IBR 从 BPServiceActor 线程独立出来了,所有我们最终将 HDFS-16016 patch 合并到自建 Hdfs3.3.3 版本中,IBR 不会被 invalidate()阻塞,问题得到根治!


5 总结与规划

总结:Oss 的流量已从早期 137Gib/s 降低到 30Gib/s 左右(下图一),自建 Hdfs 集群峰值流量达到 120Gb/s(下图二),且平稳运行




整个项目已完成全部大状态任务从 Oss 迁移到自建 Hdfs,当前 Hdfs 集群规模 xx 台,成本 x w/月,原 OSS 带宽费用报价 1x w/月,相比节省 xx w/月。


未来规划:对于全量 checkpoint 来说,TM 将每个 Checkpoint 内部的数据都写到同一个文件,而对于 RocksDBStateBackend 的增量 Checkpoint 来说,则会将每个 sst 文件写到一个分布式系统的文件内,当作业量很大,且作业的并发很大时,则会对底层 HDFS 形成非常大的压力,


1)大量的 RPC 请求会影响 RPC 的响应时间。


2)大量文件对 NameNode 内存造成很大压力。


针对上面的问题我们未来考虑引入小文件合并方案降低 HDFS 的压力,包括 RPC 压力以及 NameNode 内存的压力。


*文/希贤


线下活动推荐: 得物技术沙龙「企业协作效率演进之路」(总第 19 期)

时间:2023 年 7 月 16 日 14:00 ~ 2023 年 7 月 16 日 18:00

地点:(上海杨浦)黄兴路 221 号互联宝地 C 栋 5 楼(宁国路地铁站 1 号口出)

活动亮点:在当今竞争日益激烈的商业环境中,企业协作效率成为企业团队成功的关键。越来越多的企业意识到,通过信息化建设和工具化的支持,可以大幅提升协作效率,并在行业中取得突破。本次沙龙将涵盖多个主题,这些主题将为与会者提供丰富的思考和经验,助力企业协作效率的提升。通过得物技术沙龙这个交流平台,您将有机会与其他企业的代表一起学习、借鉴彼此的经验和做法。共同探讨企业内部协作效率的最佳实践,驱动企业长期生存和发展。加入得物技术沙龙,与行业先驱者们一起开启协作效率的新篇章!让我们共同为协作效率的突破而努力!

点击报名: 得物技术沙龙「企业协作效率演进之路」(总第19期)


本文属得物技术原创,来源于:得物技术官网

未经得物技术许可严禁转载,否则依法追究法律责任!


发布于: 刚刚阅读数: 3
用户头像

得物技术

关注

得物APP技术部 2019-11-13 加入

关注微信公众号「得物技术」

评论

发布
暂无评论
一年省七位数,得物自建HFDS在 Flink Checkpoint 场景下的应用实践_大数据_得物技术_InfoQ写作社区