关于大数据计算框架 Flink 内存管理的原理与实现总结 | 社区征文
背景介绍
最近几年国内大数据 apache 开源社区计算框架最火的莫过于 Flink,得益于阿里在后面的推动以及各大互联网大厂的参与,flink 业已成为流式计算事实上的标准。一句话来介绍 Flink 就是 “Stateful Computations Over Streams”,基于数据流的有状态计算。flink 的四个基石:Checkpoint、State、Time、Window。
Checkpoint 机制,Flink 基于 Chandy-Lamport 算法实现了分布式一致性的快照,从而提供了 exactly-once 的语义。(Flink 基于两阶段提交协议,实现了端到端的 exactly-once 语义保证。内置支持了 Kafka 的端到端保证,并提供了 TwoPhaseCommitSinkFunction 供用于实现自定义外部存储的端到端 exactly-once 保证。)
state 有状态计算:支持大状态、灵活的状态后端
Flink 还实现了 watermark 的机制,解决了基于事件时间处理时的数据乱序和数据迟到的问题。
Window:提供了一套开箱即用的窗口操作,如滚动窗口、滑动窗口、会话窗口,支持非常灵活的自定义窗口满足特殊业务需求。
带反压的流模型
Flink 是采用 java 开发的,flink 计算集群运行在 java 虚拟机中,因为 flink 计算会面临大量数据处理、大量状态存储,完全基于 jvm 的堆内存管理存在较大的缺陷,flink 基于 jvm 实现了独立的内存管理:可超出主内存的大小限制、承受更少的垃圾回收开销、对象序列化二进制存储,下面在来详细介绍下 flink 内存管理。
完全 JVM 内存管理存在的问题
基于 JVM 的数据分析引擎都需要面对将大量数据存到内存当中,就不得不面对 JVM 存在的几个问题:
java 对象存储密度低:比如一个只包含 boolean 属性的对象占用 16 个字节,对象头占用 8 个,boolean 属性占 1 个,对齐填充占了 7 个,实际上只需要一个 bit(1/8 字节)就够了他。
Full GC 会极大的影响性能,尤其是为了处理更大数据而开了很大内存空间的 jvm 来说,GC 会达到秒级甚至分钟级。
OOM 问题影响稳定性:jvm 奔溃,分布式对象框架的健壮性和稳定性都会收到影响。因此大数据框架都开始自己管理 JVM 内存了,像 Spark、Flink、Hbase,为了获取 C 一样的性能以及避免 OOM 的发生。
Flink 内存管理
因为 Java 对象及 jvm 内存管理存在的问题,flink 针对这些问题基于 jvm 进行了优化, Flink 内存管理主要会涉及内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存、JIT 编译优化。Flink 并不是将大量对象存在堆上,而是将对象序列化到一个预分配的内存块上,这个内存块叫 MemorySegment,它代表了一段固定长度的内存(默认 32KB)也就是 flink 中最小的内存分配单元,并且提供了非常高效的读写方法。底层可以是一个普通的 java 字节数组(byte[]),也可以是一个申请在堆外的 ByteBuffer。每条记录都会以序列化的形式存在一个或多个 MemorySegment 中。
TaskManager 内存模型如下图所示:
Flink 主要的内存管理是 TaskManager 进行内存管理,主要分为三部分:
Network Buffers:一定数量的 32KB 大小的 Buffer,主要用于网络传输。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,可以通过 taskmanager.network.numberOfBuffers 来配置
Memory Manager Pool:这是一个由 MemoryManager 管理的,由众多 MemorySegment 组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占用了堆内存的 70%的大小。
Remaning(free)Heap:这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从 GC 的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。
Flink 采用类似 DBMS 的 sort 和 join 算法,直接操作二进制数据,从而使序列化/反序列化带来的开销达到最小。所以 Flink 的内部实现更像 C/C++ 而非 Java。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。如果要操作多块 MemorySegment 就像操作一块大的连续内存一样,Flink 会使用逻辑视图(AbstractPagedInputView)来方便操作。下
Flink 内存管理带来的好处
减少 GC 压力,因为所有常驻内存的数据以二进制的形式存在于 Flink 的 MemoryManager 中,这些 MemorySegment 一直待在老年代不会被 GC 回收。其它的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象可以被 MinorGC 快速回收。只要用户不去创建大量类似缓存的常驻对象,老年代的大小是不会变的,Major GC 也就永远也不能发生。从而有效地降低了垃圾回收的压力。另外,这里的内存还可以是堆外内存,这可以使得 jvm 内存更小了,从而加速垃圾回收。
避免了 OOM,所有运行的数据结构和算法只能通过内存池申请内存,保证了其使用内存的大小是固定不变的,不会因为运行时数据结构和算法而发生 OOM,在内存吃紧的情况下,算法(sort/join 等)会高效地将一大批内存块写入到磁盘,之后再读回来,因此,OutOfMemoryErrors 可以有效的避免。
节省内存空间。java 对象再存储上有很多额外的消耗。如果只存储实际的二进制内容,就可以避免这部分消耗。
高效的二进制操作 & 缓存友好的计算。二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及 hash 值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好,可以从 L1/L2/L3 缓存获得性能的提升
Flink 量身定制的序列化框架
Flink 没有采用 java 生态圈众多的序列化框架,而是自己实现了序列化框架。因为在 flink 中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象 schema 信息,节省大量的存储空间。同时对于固定大小的类型,也可以通过固定的偏移位置存取。访问某个对象成员变量,可以可以直接通过偏移量,只是序列化特定的对象成员变量了。如果对象的成员变量较多时,能够大大减少 java 对象的创建开销以及内存数据拷贝的大小。
Flink 如何直接操作二进制数据
以 sort 为例:
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。其次,把 sort buffer 分成两块区域,一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的 key(Key+pointer)。如果需要序列化的 key 是个变长类型,如 String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有 key)会被加到第二个区域。这样做的目地:第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其它 key 和 pointer。第二,这样做是缓存友好的,因为 key 都是连续存储在内存中,可以大大减少 cache miss
接着,排序操作,flink 中可以先用 key 比大小,这样可以直接用二进制的 key 比较而不需要反序列化出整个对象。因为 Key 是定长的,所以如果 key 相同,那就必须将真实的二进制数据反序列化出来,然后再做比较。之后只需要交换 key+pointer 就可以达到排序的效果,真实的数据不用移动。
最后,访问排序后的数据,可以沿着排好序的 key+pointer 区域顺序访问,通过 pointer 找到对应的真实数据,并写到内存或着外部。
缓存友好的数据结构和算法
Flink 通过定制的序列化框架将算法中需要操作的数据(如 sort 中的 key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key+pointer 更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。
堆外内存
启动超大内存(上百 GB)的 JVM 需要很长时间,GC 停留时间也会很长(分钟级)。使用堆外内存的话,可以极大地减小堆内存(只需要分配 Remaining Heap 那一块),使得 TaskManager 扩展到上百 GB 内存不是问题。
高效的 IO 操作。堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至少需要 copy 一次。
堆外内存是进程间共享的。也就是说,即使 JVM 进程崩溃也不会丢失数据。这可以用来做故障恢复(Flink 暂时没有利用起这个,不过未来很可能会去做)。
不好的地方:
堆内存的使用、监控、调试都要简单很多。堆外内存意味着更复杂更麻烦。
Flink 有时需要分配短生命周期的 MemorySegment,这个申请在堆上会更廉价。
有些操作在堆内存上会快一点。
总结
Flink 面对 jvm 存在的问题,从自己管理内存、到自己实现序列化框架、再到使用堆外内存,基本上是按照大数据生态通用的解决方式去处理,其解决思路值得我们在进行分布式计算框架设计和实现的时候作参考。
在从 apache 生态圈的设计上基本上分布式计算框架,大都开始了部分脱离 JVM,走上了自己管理内存的路线,比如 spark Tungsten 甚至更进一步,提出了通过 LLVM,将部分逻辑编译成本地代码,从而更加深入的挖掘 SIMD 等 CPU 潜力,除此之外 HBase、HDFS 等存储相关项目也在部分性能相关的模块通过自己管理内存来规避 JVM 的一些缺陷,同时提升性能。
参考文档:
https://zhuanlan.zhihu.com/p/20228397
https://flink.apache.org/news/2015/09/16/off-heap-memory.html
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/
评论