写点什么

分析 Flink 任务如何超过 YARN 容器内存限制

  • 2022 年 8 月 11 日
    江苏
  • 本文字数:6751 字

    阅读完需:约 22 分钟

本文作者为中国移动云能力中心大数据团队软件开发工程师谢磊,文章针对 Flink 任务在 YARN 集群运行间隔一段时间就会重启的问题,进行逐步分析和模拟复现,并给出内存泄露修复方案,供大家参考。

问题背景

业务的 Flink 任务在 YARN 集群运行间隔一段时间就会自动重启,对于这类问题一般来讲,已经轻车熟路,有部分可以尝试的思路:

1.排查内存是否溢出(堆内/堆外)2.程序中偶发的 bug 导致 3.YARN 集群节点上下线导致 4.FLINK 程序中使用了大窗口,例如小时级别的窗口,数据超过内存 5....

同时在我们的 YARN 集群开启了物理内存检测选项,当进程使用物理内存超过申请内存时,YARN 集群会主动 kill 掉任务的进程,来保证集群的稳定性。

<property>

<name>yarn.nodemanager.pmem-check-enabled</name><value>true</value>

</property>

异常信息

我们先从异常信息看起,通过排查 JobManager 的运行日志,我们会发现如下异常堆栈信息。

2020-04-15 01:59:33,000 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e05_1585737758019_0901_01_000003 because: Container [pid=3156625,containerID=container_e05_1585737758019_0901_01_000003] is running beyond physical memory limits. Current usage: 6.1 GB of 6 GB physical memory used; 14.5 GB of 28 GB virtual memory used. Killing container.Dump of the process-tree for container_e05_1585737758019_0901_01_000003 :|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE|- 3156625 3156621 3156625 3156625 (bash) 0 0 15441920 698 /bin/bash -c /usr/java/default/bin/java -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m -javaagent:lib/aspectjweaver-1.9.1.jar -Dlog.file=/data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.out 2> /data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.err|- 3156696 3156625 3156625 3156625 (java) 12263 1319 15553892352 2119601 /usr/java/default/bin/java -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m -javaagent:lib/aspectjweaver-1.9.1.jar -Dlog.file=/data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

从异常信息中,关键信息 is running beyond physical memory limits. Current usage: 6.1 GB of 6 GB physical memory used; 14.5 GB of 28 GB virtual memory used. Killing container,显示物理内存超过 6GB 内存,被 YARN 的检测机制给 KILL 掉了。

既然是内存超了,先看看 Flink 任务执行的大概执行情况,简单从 UI 分析,如此简单的程序!

几点疑问待解决

1.如此简单的程序,为啥会导致进程超过 6GB(RSS),不可思议

2.YARN 的内存检测机制是是什么?如何获取进程内存信息?

3.JVM 参数 -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m 堆内存 + 堆外内存 = 6GB,为什么在日志中并没有看到 OutofMemory 相关的异常信息?

4.RSS 怎么会超过 6GB 呢?

分析过程

YARN 内存检测机制

通过 Hadoop 源码分析可以知道,YARN 是解析 /proc/<pid>/stat 文件,获取 RSS 的值和 container 初始化时申请的内存作对比,如果 RSS 的值超过申请值,则 KILL 进程,并打印出信息。

JVM 相关常规检测

GC

检测仅通过 GC 日志状态,很正常的状态,没问题。

[dcadmin@spslave28 ~]$ jstat -gcutil 12984 1000S0 S1 E O M CCS YGC YGCT FGC FGCT GCT

99.96 0.00 79.06 4.78 94.92 89.38 2 0.164 0 0.000 0.16499.96 0.00 86.77 4.78 94.92 89.38 2 0.164 0 0.000 0.16499.96 0.00 94.48 4.78 94.92 89.38 2 0.164 0 0.000 0.1640.00 99.98 1.95 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 9.77 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 17.58 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 25.40 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 35.16 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 41.02 10.24 94.93 89.38 3 0.255 0 0.000 0.255

Dump 内存

象征性 dump 内存看看,不能使用 -dump:live 会做一次 FullGC,导致结果不一定反应正确内存。

jmap -dump:format=b,file=heap1.bin 12984

先不通过 Eclipse-MAT 打开,直接看下文件大小。

[dcadmin@spslave28 ~]$ ll -lh heap1.bin

-rw------- 1 dcadmin datacentergroup 1016M Apr 15 09:15 heap1.bin

同时看下 linux,此程序真实用的物理内存是多少 top -p 12984

Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie%Cpu(s): 0.6 us, 0.1 sy, 0.0 ni, 99.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem : 16394040 total, 5676652 free, 8114832 used, 2602556 buff/cacheKiB Swap: 0 total, 0 free, 0 used. 7523032 avail Mem


PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND

12984 dcadmin 20 0 16.258g 7.053g 15104 S 13.3 45.1 1:27.92 java

划重点!

•JVM 的堆内存 1GB 以内,RSS 用掉了 7.053GB,粗浅的理解,那就是堆外内存用掉大概 6GB 左右,看上去没毛病,但别忘了 JVM 启动时,包含参数-XX:MaxDirectMemorySize=1996m 呃,没报 OutofMemoryError。

•RSS 内存仍然在缓慢增长中,没有下降趋势!

重点分析 RSS 内存的问题

•分析 JVM 堆外内存详细分配情况

•分析 RSS 中 7GB 内存中具体是哪些东西

分析堆外内存分配在

JVM 启动参数添加 -XX:NativeMemoryTracking=summary 分析堆外内存泄漏问题。其实从下面的根本分析不出什么东西,因为没有报 OutofMemoryError,所以需要从 RSS 内存入手。

[dcadmin@spslave28 ~]$ jcmd 21567 VM.native_memory summary

21567:

Native Memory Tracking:

Total: reserved=5815901KB, committed=4521641KB

-                 Java Heap (reserved=4247552KB, committed=4247552KB)														(mmap: reserved=4247552KB, committed=4247552KB) 

- Class (reserved=1076408KB, committed=26936KB) (classes #1206) (malloc=19640KB #792) (mmap: reserved=1056768KB, committed=7296KB)
- Thread (reserved=42193KB, committed=42193KB) (thread #42) (stack: reserved=42016KB, committed=42016KB) (malloc=129KB #225) (arena=48KB #82)
- Code (reserved=250040KB, committed=5252KB) (malloc=440KB #1026) (mmap: reserved=249600KB, committed=4812KB)
- GC (reserved=177105KB, committed=177105KB) (malloc=21913KB #164) (mmap: reserved=155192KB, committed=155192KB)
- Compiler (reserved=150KB, committed=150KB) (malloc=19KB #61) (arena=131KB #3)
- Internal (reserved=19864KB, committed=19864KB) (malloc=19832KB #2577) (mmap: reserved=32KB, committed=32KB)
- Symbol (reserved=2285KB, committed=2285KB) (malloc=1254KB #255) (arena=1031KB #1)
- Native Memory Tracking (reserved=88KB, committed=88KB) (malloc=6KB #64) (tracking overhead=83KB)
- Arena Chunk (reserved=215KB, committed=215KB) (malloc=215KB)
复制代码
分析 RSS 中 7GB 内存中具体是哪些东西

分析 linux 内存的瑞士军刀,gdb 工具。

yum install -y gdb

通过 pmap 命令查看并排序,下面展示了内存地址空间,RSS 占用等信息。

[dcadmin@spslave28 ~]$ pmap -x 21567 | sort -n -k3 | more

---------------- ------- ------- -------

0000000000400000 0 0 0 r-x-- java

0000000000600000 0 0 0 rw--- java

0000000000643000 0 0 0 rw--- [ anon ]

00000006bcc00000 0 0 0 rw--- [ anon ]

00000007c00e0000 0 0 0 ----- [ anon ]

...

...

00007fb2ec000000 65508 36336 36336 rw--- [ anon ]

00007fb3c4000000 65536 41140 41140 rw--- [ anon ]

00007fb2d8000000 65508 46692 46692 rw--- [ anon ]

00007fb2e4000000 65508 47640 47640 rw--- [ anon ]

00007fb2e0000000 65508 48596 48596 rw--- [ anon ]

00007fb2dc000000 65512 49088 49088 rw--- [ anon ]

00007fb2cc000000 65508 50380 50380 rw--- [ anon ]

00007fb2d4000000 65508 53476 53476 rw--- [ anon ]

00007fb238000000 131056 59668 59668 rw--- [ anon ]

00000006bcc00000 4248448 1866536 1866536 rw--- [ anon ]

通过上面的信息,只有内存起始地址,并没有终止地址,需要到 maps 文件中。

[dcadmin@spslave28 ~]$ cat /proc/21567/maps | grep 7fb2dc

7fb2dbff9000-7fb2dc000000 ---p 00000000 00:00 0

7fb2dc000000-7fb2dfffa000 rw-p 00000000 00:00 0

通过 gdb 命令 attach 进程,并 dump 内存。

gdb attach 21567

dump memory mem.bin 0x7fb2dc000000 0x7fb2dfffa000

通过 strings 命令查看 mem.bin。

strings mem.bin | more

满屏幕都在刷类似配置文件的内容。

...

xcx.userprofile.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096

xcx.userprofile.kafkasource.topic=CENTER_search_trajectory_xcx

xcx.userprofile.kafkasource.group=search_flink_xcx_userprofile_online

app.userprofile.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096

app.userprofile.kafkasource.topic=CENTER_search_trajectory_app

app.userprofile.kafkasource.group=search_flink_app_userprofile_online

xcx.history.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096

xcx.history.kafkasource.topic=CENTER_search_trajectory_xcx

...

分析/简化业务代码

从上述分析的结果看,应该是代码中有地方再不停地加载配置文件到内存中,简化下业务代码。迎来新的疑问,虽然代码中不应该每次都去加载配置文件,但不至于把物理内存消耗到 6GB 吧,所以这里并不存在逃逸分析,理论会主动释放,为什么没有呢?

public class StreamingJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStream<String> text = env.socketTextStream("10.101.52.18", 9909);
text.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { TestFun testFun = new TestFun(); testFun.update(); return null; } });
env.execute(); }
static class TestFun { Properties properties; public TestFun() throws IOException { properties = new Properties(); properties.load(TestFun.class.getClassLoader().getResourceAsStream("application.properties")); }
public void update() {} }}
复制代码


模拟复现

不管怎样,虽然怀疑,试试吧,看上去没有问题。

Java 程序模拟

public class TestJar {
public static void main(String[] args) throws Exception {
while (true) {
Properties properties = new Properties();
properties.load(TestJar.class.getClassLoader().getResourceAsStream("application.properties"));
}
}
}
复制代码

启动测试程序

java -Xmx512m -Xms512m -XX:MaxDirectMemorySize=1996m -cp test.jar com.ly.search.job.TestJob

观察程序的 RSS 内存消耗可以看到 RSS 内存逐步增加,一点不下降,所以这就是问题所在了,泄漏了。

内存泄漏修复及扩展

修复方案

•及时关闭 stream

public class TestJar {    public static void main(String[] args) throws Exception {        while (true) {            Properties properties = new Properties();            InputStream inStream = TestJar.class.getClassLoader().getResourceAsStream("application.properties")            properties.load(inStream);            inStream.close();        }    }}
复制代码

•通过 System.gc() 也有效果,及时清理 Finalizer(这种是不可取,只作为讨论方案), 参考《JVM 源码分析之 FinalReference 完全解读》


public class TestJar { public static void main(String[] args) throws Exception { while (true) { Properties properties = new Properties(); InputStream inStream = TestJar.class.getClassLoader().getResourceAsStream("application.properties") properties.load(inStream); System.gc(); } }}
复制代码


扩展研究

•XXX.class.getClassLoader().getResourceAsStream 底层是 URLClassLoader + JarURLConnection,即


// 等价于,内存溢出ClassLoader classLoader = TestJar.class.getClassLoader();URL resource = classLoader.getResource("application.properties");URLConnection urlConnection = resource.openConnection();urlConnection.getInputStream();
// 等价于,内存溢出URL url = new URL("jar:file:/home/dcadmin/test.jar!/com/ly/search/job/StreamingJob.class");JarURLConnection conn = (JarURLConnection) url.openConnection();conn.getInputStream();
// 不等价于,内存不溢出URL url = new URL("jar:file:/home/dcadmin/test.jar!/com/ly/search/job/StreamingJob.class");JarURLConnection conn = (JarURLConnection) url.openConnection();conn.setDefaultUseCaches(false);conn.getInputStream();
// 不等价于,内存不溢出URL fileURL = new File("test.jar").toURI().toURL();
FileURLConnection fileUrlConn = (FileURLConnection) fileURL.openConnection();fileUrlConn.connect();fileUrlConn.getInputStream();
复制代码

•JarURLConnection 和 FileURLConnection 的区别在于 JarURLConnection 底层需要调用 JarFile 打开 ZipFile 的 inputstream,涉及底层系统的调用,所以消耗了物理内存,导致 RSS 增大

•堆外内存一般排查 DirectByteBuffer 但此情况中,堆外确实并没有溢出,只是操作系统崩了

•堆 / 堆外均不超内存,所以触发不了 GC,泄漏慢慢就扩展开了,直至超出机器内存

用户头像

移动云,5G时代你身边的智慧云 2019.02.13 加入

移动云大数据产品团队,在移动云上提供云原生大数据分析LakeHouse,消息队列Kafka/Pulsar,云数据库HBase,弹性MapReduce,数据集成与治理等PaaS服务。 微信公众号:人人都学大数据

评论

发布
暂无评论
分析 Flink 任务如何超过 YARN 容器内存限制_Flink 平台_移动云大数据_InfoQ写作社区