带你认识 FusionInsight Flink:既能批处理,又能流处理
摘要:本文主要介绍了 FusionInsight Flink 组件的基本原理、Flink 任务提交的常见问题、以及最佳实践 FAQ。
本文分享自华为云社区《FusionInsight HD Flink组件基本原理和常见问题解析》,作者:FI 小粉丝 。
Flink 是一个批处理和流处理结合的统一计算框架,其核心是一个提供数据分发以及并行化计算的流数据处理引擎。
它的最大亮点是流处理,是业界最顶级的开源流处理引擎。
Flink 最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发 pipeline 处理数据,时延毫秒级,且兼具可靠性。
本文主要介绍了 FusionInsight Flink 组件的基本原理、Flink 任务提交的常见问题、以及最佳实践 FAQ。
基本概念
基本原理
简介
Flink 是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。
Flink 最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发 pipeline 处理数据,时延毫秒级,且兼具可靠性。
Flink 技术栈如图所示:
Flink 在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强,具体请参考:https://ci.apache.org/projects/flink/flink-docs-release-1.4/
DataStream
Checkpoint
Stream SQL
窗口
Job Pipeline
配置表
架构
Flink 架构如图所示。
Flink 整个系统包含三个部分:
Client
Flink Client 主要给用户提供向 Flink 系统提交用户任务(流式作业)的能力。
TaskManager
Flink 系统的业务执行节点,执行具体的用户任务。TaskManager 可以有多个,各个 TaskManager 都平等。
JobManager
Flink 系统的管理节点,管理所有的 TaskManager,并决策用户任务在哪些 Taskmanager 执行。JobManager 在 HA 模式下可以有多个,但只有一个主 JobManager。
Flink 系统提供的关键能力:
低时延
提供 ms 级时延的处理能力。
Exactly Once
提供异步快照机制,保证所有数据真正只处理一次。
HA
JobManager 支持主备模式,保证无单点故障。
水平扩展能力
TaskManager 支持手动水平扩展。
原理
Stream & Transformation & Operator
用户实现的 Flink 程序是由 Stream 和 Transformation 这两个基本构建块组成。
1. Stream 是一个中间结果数据,而 Transformation 是一个操作,它对一个或多个输入 Stream 进行计算处理,输出一个或多个结果 Stream。
2. 当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow。一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成,它类似于一个 DAG 图,在启动的时候从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator。
下图为一个由 Flink 程序映射为 Streaming Dataflow 的示意图。
上图中“FlinkKafkaConsumer”是一个 Source Operator,Map、KeyBy、TimeWindow、Apply 是 Transformation Operator,RollingSink 是一个 Sink Operator。
Pipeline Dataflow
在 Flink 中,程序是并行和分布式的方式运行。一个 Stream 可以被分成多个 Stream 分区(Stream Partitions),一个 Operator 可以被分成多个 Operator Subtask。
Flink 内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。
紧密度低的算子则不能进行优化,而是将每一个 Operator Subtask 放在不同的线程中独立执行。一个 Operator 的并行度,等于 Operator Subtask 的个数,一个 Stream 的并行度(分区总数)等于生成它的 Operator 的并行度。如下图所示。
Operator
紧密度高的算子可以进行优化,优化后可以将多个 Operator Subtask 串起来组成一个 Operator Chain,实际上就是一个执行链,每个执行链会在 TaskManager 上一个独立的线程中执行,如下图所示。
Operator chain
上图中上半部分表示的是将 Source 和 map 两个紧密度高的算子优化后串成一个 Operator Chain,实际上一个 Operator Chain 就是一个大的 Operator 的概念。图中的 Operator Chain 表示一个 Operator,keyBy 表示一个 Operator,Sink 表示一个 Operator,它们通过 Stream 连接,而每个 Operator 在运行时对应一个 Task,也就是说图中的上半部分有 3 个 Operator 对应的是 3 个 Task。
上图中下半部分是上半部分的一个并行版本,对每一个 Task 都并行化为多个 Subtask,这里只是演示了 2 个并行度,sink 算子是 1 个并行度。
日志介绍
日志描述
日志存储路径:
Executor 运行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}”
说明:
运行中的任务日志存储在以上路径中,运行结束后会基于 Yarn 的配置确定是否汇聚到 HDFS 目录中。
其他日志:“/var/log/Bigdata/flink/flinkResource”
日志归档规则:
Executor 日志默认 50MB 滚动存储一次,最多保留 100 个文件,不压缩。
日志大小和压缩文件保留个数可以在 FusionInsight Manager 界面中配置。
Flink 日志列表
日志级别
Flink 中提供了如下表所示的日志级别。日志级别优先级从高到低分别是 ERROR、WARN、INFO、DEBUG。程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。
日志级别
如果您需要修改日志级别,请执行如下操作:
1. 登录 FusionInsight Manager 系统。
2. 选择“服务管理 > Flink > 服务配置”。
3. “参数类别”下拉框中选择“全部”。
4. 左边菜单栏中选择所需修改的角色所对应的日志菜单。
5. 选择所需修改的日志级别。
6. 单击“保存配置”,在弹出窗口中单击“确定”使配置生效。
说明:
配置完成后立即生效,不需要重启服务。
日志格式
常见故障
1. Flink 对接 kafka-写入数据倾斜,部分分区没有写入数据
问题现象与背景
使用 FlinkKafkaProducer 进行数据生产,数据只写到了 kafka 的部分分区中,其它的分区没有数据写入
原因分析
可能原因 1:Flink 写 kafka 使用的机制与原生接口的写入方式是有差别的,在默认情况下,Flink 使用了”并行度编号+分区数量”取模计算的结果作为 topic 的分区编号。那么会有以下两种场景:
1. 并行度 %分区数量=0,表示并行度是 kafkatopic 分区数的一倍或者多倍,数据的写入每个分区数据量是均衡的。
2. 并行度 %分区数量≠0,那么数据量势必会在个别分区上的数据量产生倾斜。
可能原因 2:在业务代码的部分算子中使用了 keyby()方法,由于现网中的数据流中,每个 key 值所属的数据量不一致(就是说某些 key 的数据量会非常大,有些又非常小)导致每个并行度中输出的数据流量不一致。从而出现数据倾斜。
解决办法
原因一:
方法 1,调整 kafka 的分区数跟 flink 的并行度保持一致,即要求 kafka 的分区数与 flink 写 kafka 的 sink 并行度保持强一致性。这种做法的优势在于每个并行度仅需要跟每个 kafka 分区所在的 broker 保持一个常链接即可。能够节省每个并发线程与分区之间调度的时间。
方法 2,flink 写 kafka 的 sink 的分区策略写成随机写入模式,如下图,这样数据会随即写入 topic 的分区中,但是会有一部分时间损耗在线程向寻址,推荐使用方法 1。
原因二:
需要调整业务侧对 key 值的选取,例如:可以将 key 调整为“key+随机数”的方式,保证 Flink 的 keyby()算子中每个处理并行度中的数据是均衡的。
2. Flink 任务的日志目录增长过快,导致磁盘写满
问题现象
集群告警磁盘使用率超过阈值,经过排查发现是 taskmanager.out 文件过大导致
原因分析
代码中存在大量的 print 模块,导致 taskmanager.out 文件被写入大量的日志信息,taskmanager.out 一般是,业务代码加入了 .print 的代码,需要在代码中排查是否有类似于以下的代码逻辑:
或者类似于这样的打印:
如果包含,日志信息会持续打印到 taskmanager.out 里面。
解决方案
将上图红框中的代码去掉,或者输出到日志文件中。
3. 任务启动失败,报资源不足:Could not allocate all requires slots within timeout of xxxx ms
问题现象
任务启动一段时间后报错,例如如下日志,需要 60 个资源实际上只有 54 个。
原因分析
Flink 任务在启动过程中的资源使用是先增长在下降到当前值的,实际在启动过程中需要的资源量等于每个算子并行度之和。等到任务开始运行后,Flink 会对资源进行合并。
例如如下算子,在启动过程中需要“1+6+6+5+3=21”个资源。
但是运行稳定后会降低到 6。这个是 Flink 的机制。假如任务在启动过程中不满足 21 个资源的启动资源量,任务就会出现 NoResourceAvailableException 的异常。
解决方案
减少任务的启动并发,或者将其它任务 kill 掉再启动 Flink 任务。
4. 算子的部分节点产生背压,其它节点正常
问题现象
业务运行一段时间以后,算子的部分节点出现背压。
原因分析
1. 通过 Flink 原生页面排查这个并发的算子所在的节点,通过上图我们能够看出是异常算子的第 44 个并发。通过前台页面能够查看并确认第 44 并发所在的节点,例如下图:
2. 通过查找这个节点在 taskmanager 列表,例如下图位置:
整理 taskmanager 在每个 nodemanager 节点的数量发现,背压节点启动的 taskmanager 数量过多。
3. 经过排查,该 yarn 集群资源相对比较紧张,每个节点启动的 taskmanager 数量不一致,如果部分节点启动的较多容易出现数据倾斜。
解决方案
建议一个节点启动多个 slot。避免多个 taskmanager 出现在一个 nodemanager 节点上。启动方式见:slot优化。
FAQ
Flink 如何加载其它目录的 jar 包
需求描述
Flink 业务一般在运行过程中默认加载的 jar 包路径为:“xxx/Flink/flink/lib”的目录下,如果添加其它路径的 jar 包会报错,如何添加其它外部依赖。
实现方案
1. 创建一个外部的 lib 目录,将部分依赖包放到外部 lib 目录下,如下图:
2. 修改启动脚本的参数配置脚本,config.sh 将 jar 包路径传给环境变量中。
3. 此时正常启动任务即可,不需要加其它参数。
4. HDFS 上也能看到第三方 jar 的目录。
如何收集任务 taskmanager 的 jstack 和 pstree 信息
需求描述
在任务运行过程中我们通常需要对 taskmanager 的进程进行查询和处理,例如:打 jstack,jmap 等操作,做这些操作的过程中需要获取任务的 taskmanager 信息。
实现方案
获取一个 nodemanager 节点上面所有 taskmanager 的进程信息的方法如下:
ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c"
其中红框中的内容就是 taskmanager 的进程号,如果一个节点上面存在多个 taskmanager 那么这个地方会有多个进程号。获取到进程号后我们可以针对这个进程号收集 jstack 或者 pstree 信息。
收集 jstack
1. 通过上面流程获取到进程信息,然后从中获取进程 ID 和 application id,如上图中进程 id 为“30047 applicationid 为 application_1623239745860_0001”。
2. 从 FI 前台界面获取这个进程的启动用户。如下图为 flinkuser。
3. 在对应的 nodemanager 节点后台切换到这个用户,人机用户机机用户即可。
4. 进入到节点所在的 jdk 目录下
5. 给 taskmanager 进程打 jstack。
须知:
不同用户提交的 taskmanager 只能由提交任务的用户打 jstack。
收集 pstree 信息
使用 pstree –p PID 的方式能够获取 taskmanager 的 pstree 信息,这个地方提供一个收集脚本。内容如下:
该脚本的功能为获取节点上所有 taskmanager pstree 的数量,打印结果如下:
slot 优化
需求描述
Slot 可以认为是 taskmanager 上面一块独立分配的资源,是 taskmanager 并行执行的能力的体现。Taskmanager 中有两种使用 slot 的方法:
一个 taskmanager 中设置了一个 slot。
一个 taskmanager 中设置了多个 slot。
每个 task slot 表示 TaskManager 拥有资源的一个固定大小的子集。假如一个 taskManager 有三个 slot,那么它会将其管理的内存分成三份给各个 slot。资源 slot 化意味着一个 subtask 将不需要跟来自其他 job 的 subtask 竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到 CPU 的隔离,slot 目前仅用来隔离 task 的受管理的内存。通过调整 task slot 的数量,允许用户定义 subtask 之间隔离的方式。如果一个 TaskManager 一个 slot,那将意味着每个 task group 运行在独立的 JVM 中(该 JVM 可能是通过一个特定的容器启动的),而一个 TaskManager 多个 slot 意味着更多的 subtask 可以共享同一个 JVM。而在同一个 JVM 进程中的 task 将共享 TCP 连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构。因此,对于资源密集型任务(尤其是对 cpu 使用较为密集的)不建议使用单个 taskmanager 中创建多个 slot 使用,否则容易导致 taskmanager 心跳超时,出现任务失败。如果需要设置单 taskmanager 多 slot,参考如下操作。
单 taskmanager 多 slot 的设置方法
方式一:在配置文件中配置 taskmanager.numberOfTaskSlots,通过修改提交任务的客户端配置文件中的配置 flink-conf.yaml 配置,如下图:将该值设置为需要调整的数值即可。
方式二:启动命令的过程中使用-ys 命令传入,例如以下命令:
./flink run -m yarn-cluster -p 1 -ys 3 ../examples/streaming/WindowJoin.jar
在启动后在一个 taskmanager 中会启动 3 个 slot。
单 taskmanager 多 slot 需要优化哪些参数
设置单 taskmanager 多 slot 需要优化以下参数
版权声明: 本文为 InfoQ 作者【华为云开发者社区】的原创文章。
原文链接:【http://xie.infoq.cn/article/8b4bb35ae8646ea1c1c96dab6】。文章转载请联系作者。
评论