写点什么

Flink 程序优化及反压机制

发布于: 2021 年 03 月 30 日
Flink程序优化及反压机制

1. 使用 Flink Checkpoint 进行程序优化


checkpoint 是 Flink 容错的核心机制。它可以定期地将各个 Operator 处理的数据进行快照存储( Snapshot )。如果 Flink 程序出现宕机,可以重新从这些快照中恢复数据。



  1. checkpoint coordinator(协调器)线程周期生成 barrier (栅栏),发送给每一个 source

  2. source 将当前的状态进行 snapshot(可以保存到 HDFS)

  3. source 向 coordinator 确认 snapshot 已经完成

  4. source 继续向下游 transformation operator 发送 barrier

  5. transformation operator 重复 source 的操作,直到 sink operator 向协调器确认 snapshot 完成

  6. coordinator 确认完成本周期的 snapshot


配置以下 checkpoint:


1、开启 checkpoint


2、设置 checkpoint 保存 HDFS 的位置


3、配置 checkpoint 的最小时间间隔(1 秒)


4、配置 checkpoint 最大线程数 (1)


5、配置 checkpoint 超时时间 (60 秒)


6、配置程序关闭,额外触发 checkpoint


7、配置重启策略 (尝试 1 次,延迟 1 秒启动)


8、给两个 source 添加 checkpoint 容错支持


  • 给需要进行 checkpoint 的 operator 设置 uid


// 配置Checkpointenv.enableCheckpointing(5000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// checkpoint的HDFS保存位置env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/checkpoint/"))// 配置两次checkpoint的最小时间间隔env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)// 配置最大checkpoint的并行度env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 配置checkpoint的超时时长env.getCheckpointConfig.setCheckpointTimeout(60000)// 当程序关闭,触发额外的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000))
复制代码

2.什么是背压问题


  • 流系统中消息的处理速度跟不上消息的发送速度,会导致消息的堆积

  • 许多日常问题都会导致背压

  • 背压如果不能得到正确地处理,可能会导致 资源被耗尽 或者甚至出现更糟的情况导致数据丢失


在同一时间点,不管是流处理 job 还是 sink,如果有 1 秒的卡顿,那么将导致至少 500 万条记录的积压。换句话说,source 可能会产生一个脉冲,在一秒内数据的生产速度突然翻倍。



1. 举例说明


1、正常情况


  • 消息处理速度 >= 消息的发送速度,不发生消息拥堵,系统运行流畅



2、异常情况


  • 消息处理速度< 消息的发送速度,发生了消息拥堵,系统运行不畅。



2. 背压问题解决方案


可以采取三种方案:


  • 将拥堵的消息直接删除

  • 将缓冲区持久化,以方便在处理失败的情况下进行数据重放

  • 将拥堵的消息缓存起来,并告知消息发送者减缓消息发送的速度

3. Flink 如何解决背压问题


Flink 内部自动实现数据流自然降速,而无需担心数据丢失。Flink 所获取的最大吞吐量是由 pipeline 中最慢的组件决定

4. Flink 解决背压问题的原理



1、TaskManager(TM)启动时,会初始化网络缓冲池(NetworkBufferPool)


  • 默认生成 2048 个内存块(MemorySegment)

  • 网络缓冲池是 Task 之间共享的


2、Task 线程启动时,Flink 会为 Task 的 Input Gate(IG)和 ResultPartion(RS)分别创建一个 LocationBufferPool


  • LocationBufferPool 的内存数量由 Flink 分配

  • 为了系统更容易应对瞬时压力,内存数量是动态分配的


3、Task 线程执行时,Netty 接收端接收到数据时,为了将数据保存拷贝到 Task 中


  • Task 线程需要向本地缓冲池(LocalBufferPool)申请内存

  • 若本地缓冲池没有可用内存,则继续向网络缓冲池(NetworkBufferPool)申请内存

  • 内存申请成功,则开始从 Netty 中拷贝数据

  • 若缓冲池已申请的数量达到上限,或网络缓冲池(NetworkerBufferPool)也没有可用内存时,该 Task 的 Netty Channel 会暂停读取,上游的发送端会立即响应停止发送,Flink 流系统进入反压状态


4、经过 Task 处理后,由 Task 写入到 ResultPartion(RS)中


  • 当 Task 线程写数据到 ResultPartion(RS)时,也会向网络缓冲池申请内存

  • 如果没有可用内存块,也会阻塞 Task,暂停写入


5、Task 处理完毕数据后,会将内存块交还给本地缓冲池(LocalBufferPool)


  • 如果本地缓冲池申请内存的数量超过池子设置的数量,将内存块回收给 网络缓冲池。如果没超过,会继续留在池子中,减少反复申请开销


发布于: 2021 年 03 月 30 日阅读数: 10
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink程序优化及反压机制