写点什么

Flink 的容错管理详细剖析

  • 2021 年 11 月 10 日
  • 本文字数:12963 字

    阅读完需:约 43 分钟

1. Checkpoint 介绍

checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink 的 checkpoint 机制原理来自“Chandy-Lamport algorithm”算法。


每个需要 checkpoint 的应用在启动时,Flink 的 JobManager 为其创建一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本应用的快照制作。



  1. CheckpointCoordinator(检查点协调器) 周期性的向该流应用的所有 source 算子发送 barrier(屏障)。

  2. 当某个 source 算子收到一个 barrier 时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向 CheckpointCoordinator 报告自己快照制作情况,同时向自身所有下游算子广播该 barrier,恢复数据处理

  3. 下游算子收到 barrier 之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向 CheckpointCoordinator 报告自身快照情况,同时向自身所有下游算子广播该 barrier,恢复数据处理。

  4. 每个算子按照步骤 3 不断制作快照并向下游广播,直到最后 barrier 传递到 sink 算子,快照制作完成。

  5. 当 CheckpointCoordinator 收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败。


如果一个算子有两个输入源,则暂时阻塞先收到 barrier 的输入源,等到第二个输入源相 同编号的 barrier 到来时,再制作自身快照并向下游广播该 barrier。具体如下图所示:



  1. 假设算子 C 有 A 和 B 两个输入源

  2. 在第 i 个快照周期中,由于某些原因(如处理时延、网络时延等)输入源 A 发出的 barrier 先到来,这时算子 C 暂时将输入源 A 的输入通道阻塞,仅收输入源 B 的数据。

  3. 当输入源 B 发出的 barrier 到来时,算子 C 制作自身快照并向 CheckpointCoordinator 报告自身的快照制作情况,然后将两个 barrier 合并为一个,向下游所有的算子广播。

  4. 当由于某些原因出现故障时,CheckpointCoordinator 通知流图上所有算子统一恢复到某个周期的 checkpoint 状态,然后恢复数据流处理。分布式 checkpoint 机制保证了数据仅被处理一次(Exactly Once)。

2. 持久化存储

1) MemStateBackend

该持久化存储主要将快照数据保存到 JobManager 的内存中,仅适合作为测试以及快照的数据量非常小时使用,并不推荐用作大规模商业部署。


MemoryStateBackend 的局限性


默认情况下,每个状态的大小限制为 5 MB。可以在 MemoryStateBackend 的构造函数中增加此值。


无论配置的最大状态大小如何,状态都不能大于 akka 帧的大小(请参阅配置)。


聚合状态必须适合 JobManager 内存。


建议 MemoryStateBackend 用于


本地开发和调试。


状态很少的作业,例如仅包含一次记录功能的作业(Map,FlatMap,Filter,...),kafka 的消费者需要很少的状态。

2) FsStateBackend

该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS 和本地文件。如果使用 HDFS,则初始化 FsStateBackend 时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点 A 上失败,在节点 B 上恢复,使用本地文件时,在 B 上无法读取节点 A 上的数据,导致状态恢复失败。


建议 FsStateBackend:


具有大状态,长窗口,大键 / 值状态的作业。


所有高可用性设置。

3) RocksDBStateBackend

RocksDBStatBackend 介于本地文件和 HDFS 之间,平时使用 RocksDB 的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend 中(FsStateBackend 不必用户特别指明,只需在初始化时传入 HDFS 或本地路径即可,如 new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或 new RocksDBStateBackend("file:///Data"))。


如果用户使用自定义窗口(window),不推荐用户使用 RocksDBStateBackend。在自定义窗口中,状态以 ListState 的形式保存在 StatBackend 中,如果一个 key 值中有多个 value 值,则 RocksDB 读取该种 ListState 非常缓慢,影响性能。用户可以根据应用的具体情况选择 FsStateBackend+HDFS 或 RocksStateBackend+HDFS。

4) 语法

val env = StreamExecutionEnvironment.getExecutionEnvironment()// start a checkpoint every 1000 msenv.enableCheckpointing(1000)// advanced options:// 设置checkpoint的执行模式,最多执行一次或者至少执行一次env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 设置checkpoint的超时时间env.getCheckpointConfig.setCheckpointTimeout(60000)// 如果在只做快照过程中出现错误,是否让整体任务失败:true是  false不是env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)//设置同一时间有多少 个checkpoint可以同时执行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
复制代码

5) 修改 State Backend 的两种方式

第一种:单任务调整


修改当前任务代码


env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));


或者new MemoryStateBackend()


或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】


第二种:全局调整


修改flink-conf.yaml


state.backend: filesystem


state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints


注意:state.backend 的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

6) Checkpoint 的高级选项

默认 checkpoint 功能是 disabled 的,想要使用的时候需要先启用 checkpoint 开启之后,默认的 checkPointMode 是 Exactly-once


//配置一秒钟开启一个checkpointenv.enableCheckpointing(1000)//指定checkpoint的执行模式//两种可选://CheckpointingMode.EXACTLY_ONCE:默认值//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情况下选择CheckpointingMode.EXACTLY_ONCE,除非场景要求极低的延迟(几毫秒)
注意:如果需要保证EXACTLY_ONCE,source和sink要求必须同时保证EXACTLY_ONCE
复制代码


//如果程序被cancle,保留以前做的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默认情况下,检查点不被保留,仅用于在故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意,在这种情况下,您必须在取消后手动清理检查点状态
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被cancel时,删除检查点,检查点仅在作业失败时可用
复制代码


//设置checkpoint超时时间env.getCheckpointConfig.setCheckpointTimeout(60000)//Checkpointing的超时时间,超时时间内没有完成则被终止
复制代码


//Checkpointing最小时间间隔,用于指定上一个checkpoint完成之后//最小等多久可以触发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
复制代码


//设置同一个时间是否可以有多个checkpoint执行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)指定运行中的checkpoint最多可以有多少个
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)用于指定在checkpoint发生异常的时候,是否应该fail该task,默认是true,如果设置为false,则task会拒绝checkpoint然后继续运行
复制代码

2. Flink 的重启策略

Flink 支持不同的重启策略,这些重启策略控制着 job 失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果 Job 提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。

1) 概览

默认的重启策略是通过 Flink 的 flink-conf.yaml 来指定的,这个配置参数 restart-strategy 定义了哪种策略会被采用。如果 checkpoint 未启动,就会采用 no restart 策略,如果启动了 checkpoint 机制,但是未指定重启策略的话,就会采用 fixed-delay 策略,重试 Integer.MAX_VALUE 次。请参考下面的可用重启策略来了解哪些值是支持的。


每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。



除了定义一个默认的重启策略之外,你还可以为每一个 Job 指定它自己的重启策略,这个重启策略可以在 ExecutionEnvironment 中调用 setRestartStrategy() 方法来程序化地调用,注意这种方式同样适用于 StreamExecutionEnvironment


下面的例子展示了如何为 Job 设置一个固定延迟重启策略,一旦有失败,系统就会尝试每 10 秒重启一次,重启 3 次。


val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 重启次数  Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))
复制代码

2) 固定延迟重启策略(Fixed Delay Restart Strategy)

固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。


重启策略可以配置 flink-conf.yaml 的下面配置参数来启用,作为默认的重启策略:


restart-strategy: fixed-delay
复制代码



例子:


restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
复制代码


固定延迟重启也可以在程序中设置:


val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 重启次数  Time.of(10, TimeUnit.SECONDS) // 重启时间间隔))
复制代码

3) 失败率重启策略

失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。


失败率重启策略可以在 flink-conf.yaml 中设置下面的配置参数来启用:


restart-strategy:failure-rate
复制代码



例子:


restart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s
复制代码


失败率重启策略也可以在程序中设置:


val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.failureRateRestart(  3, // 每个测量时间间隔最大失败次数  Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔  Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔))
复制代码

4) 无重启策略

Job 直接失败,不会尝试进行重启


restart-strategy: none
复制代码


无重启策略也可以在程序中设置


val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.noRestart())
复制代码

5) 案例

需求:输入五次 zhangsan,程序挂掉


代码:


import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala._
object FixDelayRestartStrategiesDemo {
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
//如果想要开启重启策略,就必须开启CheckPoint env.enableCheckpointing(5000L)
//指定状态存储后端,默认就是内存 //现在指定的是FsStateBackend,支持本地系统、 //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc) env.setStateBackend(new FsStateBackend(args(0)))
//如果程序被cancle,保留以前做的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定以后存储多个checkpoint目录 env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//指定重启策略,默认的重启策略是不停的重启 //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))
val lines: DataStream[String] = env.socketTextStream(args(1), 8888)
val result = lines.flatMap(_.split(" ").map(word => { if(word.equals("zhangsan")) { throw new RuntimeException("zhangsan,程序重启!"); } (word, 1) })).keyBy(0).sum(1) result.print() env.execute() }}
复制代码

3) checkpoint 案例

1. 需求


假定用户需要每隔 1 秒钟需要统计 4 秒中窗口中数据的量,然后对统计的结果值进行 checkpoint 处理


2. 数据规划


  1. 使用自定义算子每秒钟产生大约 10000 条数据。

  2. 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。

  3. 数据经统计后,统计结果打印到终端输出。

  4. 打印输出的结果为 Long 类型的数据 。


3. 开发思路


  1. source 算子每隔 1 秒钟发送 10000 条数据,并注入到 Window 算子中。

  2. window 算子每隔 1 秒钟统计一次最近 4 秒钟内数据数量。

  3. 每隔 1 秒钟将统计结果打印到终端。

  4. 每隔 6 秒钟触发一次 checkpoint,然后将 checkpoint 的结果保存到 HDFS 中。


5. 开发步骤


  1. 获取流处理执行环境

  2. 设置检查点机制

  3. 自定义数据源

  4. 数据分组

  5. 划分时间窗口

  6. 数据聚合

  7. 数据打印

  8. 触发执行


示例代码


//发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } }}
/**该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() }}
//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count}//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L
// window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 从自定义快照中恢复状态 override def restoreState(state: util.List[UDFState]): Unit = { val udfState = state.get(0) total = udfState.getState }
// 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList }}
复制代码

4. 端对端仅处理一次语义

当谈及仅一次处理时,我们真正想表达的是每条输入消息只会影响最终结果一次!(影响应用状态一次,而非被处理一次)即使出现机器故障或软件崩溃,Flink 也要保证不会有数据被重复处理或压根就没有被处理从而影响状态。


在 Flink 1.4 版本之前,精准一次处理只限于 Flink 应用内,也就是所有的 Operator 完全由 Flink 状态保存并管理的才能实现精确一次处理。但 Flink 处理完数据后大多需要将结果发送到外部系统,比如 Sink 到 Kafka 中,这个过程中 Flink 并不保证精准一次处理。


在 Flink 1.4 版本正式引入了一个里程碑式的功能:两阶段提交 Sink,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,自此 Flink 搭配特定 Source 和 Sink(如 Kafka 0.11 版)实现精确一次处理语义(英文简称:EOS,即 Exactly-Once Semantics)。


在 Flink 中需要端到端精准一次处理的位置有三个:



  • Source 端:数据从上一阶段进入到 Flink 时,需要保证消息精准一次消费。

  • Flink 内部端:这个我们已经了解,利用 Checkpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性。不了解的小伙伴可以看下我之前的文章:

  • Flink 可靠性的基石-checkpoint 机制详细解析

  • Sink 端:将处理完的数据发送到下一阶段时,需要保证数据能够准确无误发送到下一阶段。

1) Flink 端到端精准一次处理语义(EOS)

以下内容适用于 Flink 1.4 及之后版本


对于 Source 端:Source 端的精准一次处理比较简单,毕竟数据是落到 Flink 中,所以 Flink 只需要保存消费数据的偏移量即可, 如消费 Kafka 中的数据,Flink 将 Kafka Consumer 作为 Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。


对于 Sink 端Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以精准一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用(Kafka 0.11 版本已经实现精确一次处理语义)。


我们以 Flink 与 Kafka 组合为例,Flink 从 Kafka 中读数据,处理完的数据在写入 Kafka 中。


为什么以 Kafka 为例,第一个原因是目前大多数的 Flink 系统读写数据都是与 Kafka 系统进行的。第二个原因,也是最重要的原因 Kafka 0.11 版本正式发布了对于事务的支持,这是与 Kafka 交互的 Flink 应用要实现端到端精准一次语义的必要条件


当然,Flink 支持这种精准一次处理语义并不只是限于与 Kafka 的结合,可以使用任何 Source/Sink,只要它们提供了必要的协调机制。

2) Flink 与 Kafka 组合


如上图所示,Flink 中包含以下组件:


  1. 一个 Source,从 Kafka 中读取数据(即 KafkaConsumer)

  2. 一个时间窗口化的聚会操作(Window)

  3. 一个 Sink,将结果写入到 Kafka(即 KafkaProducer)


若要 Sink 支持精准一次处理语义(EOS),它必须以事务的方式写数据到 Kafka,这样当提交事务时两次 Checkpoint 间的所有写入操作当作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。


当然了,在一个分布式且含有多个并发执行 Sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一个一致性的结果。Flink 使用两阶段提交协议以及预提交(Pre-commit)阶段来解决这个问题

3) 两阶段提交协议(2PC)

两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A (原子性)


在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性


两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个


顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)


两阶段提交协议过程如下图所示:



第一阶段:表决阶段


  1. 协调者向所有参与者发送一个 VOTE_REQUEST 消息。

  2. 当参与者接收到 VOTE_REQUEST 消息,向协调者发送 VOTE_COMMIT 消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个 VOTE_ABORT 消息,告诉协调者目前无法提交事务。


第二阶段:提交阶段


  1. 协调者收集来自各个参与者的表决消息。如果所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个 GLOBAL_COMMIT 消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务,向所有参与者广播一条 GLOBAL_ABORT 消息通知所有的参与者取消事务。

  2. 每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个 GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务。

4) 两阶段提交协议在 Flink 中的应用

Flink 的两阶段提交思路


我们从 Flink 程序启动到消费 Kafka 数据,最后到 Flink 将数据 Sink 到 Kafka 为止,来分析 Flink 的精准一次处理。


  1. 当 Checkpoint 启动时,JobManager 会将检查点分界线(checkpoint battier)注入数据流,checkpoint barrier 会在算子间传递下去,如下如所示:



  1. Source 端Flink Kafka Source 负责保存 Kafka 消费 offset,当 Chckpoint 成功时 Flink 负责提交这些写入,否则就终止取消掉它们,当 Chckpoint 完成位移保存,它会将 checkpoint barrier(检查点分界线) 传给下一个 Operator,然后每个算子会对当前的状态做个快照,保存到状态后端(State Backend)。

  2. 对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据,如下图所示:



  1. Slink 端:从 Source 端开始,每个内部的 transform 任务遇到 checkpoint barrier(检查点分界线)时,都会把状态存到 Checkpoint 里。数据处理完毕到 Sink 端时,Sink 任务首先把数据写入外部 Kafka,这些数据都属于预提交的事务(还不能被消费),此时的 Pre-commit 预提交阶段下 Data Sink 在保存状态到状态后端的同时还必须预提交它的外部事务,如下图所示:



  1. 当所有算子任务的快照完成(所有创建的快照都被视为是 Checkpoint 的一部分),也就是这次的 Checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 Checkpoint 完成,此时 Pre-commit 预提交阶段才算完成。才正式到两阶段提交协议的第二个阶段:commit 阶段。该阶段中 JobManager 会为应用中每个 Operator 发起 Checkpoint 已完成的回调逻辑。

  2. 本例中的 Data Source 和窗口操作无外部状态,因此在该阶段,这两个 Opeartor 无需执行任何逻辑,但是 Data Sink 是有外部状态的,此时我们必须提交外部事务,当 Sink 任务收到确认通知,就会正式提交之前的事务,Kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了,如下图所示:



注:Flink 由 JobManager 协调各个 TaskManager 进行 Checkpoint 存储,Checkpoint 保存在 StateBackend(状态后端) 中,默认 StateBackend 是内存级的,也可以改为文件级的进行持久化保存。


最后,一张图总结下 Flink 的 EOS:



此图建议保存,总结全面且简明扼要,再也不怂面试官!

5) Exactly-Once 案例

Kafka 来实现 End-to-End Exactly-Once 语义


import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.environment.CheckpointConfigimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper/** * Kafka Producer的容错-Kafka 0.9 and 0.10 * 如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数 * •setLogFailuresOnly(false) * •setFlushOnCheckpoint(true) * * 注意:建议修改kafka 生产者的重试次数  * retries【这个参数的值默认是0】 * * Kafka Producer的容错-Kafka 0.11 * 如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义 * 但是需要选择具体的语义 * •Semantic.NONE * •Semantic.AT_LEAST_ONCE【默认】 * •Semantic.EXACTLY_ONCE */object StreamingKafkaSinkScala {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //隐式转换    import org.apache.flink.api.scala._    //checkpoint配置    env.enableCheckpointing(5000)    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)    env.getCheckpointConfig.setCheckpointTimeout(60000)    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val text = env.socketTextStream("node01", 9001, '\n') val topic = "test" val prop = new Properties() prop.setProperty("bootstrap.servers", "node01:9092") //设置事务超时时间,也可在kafka配置中设置 prop.setProperty("transaction.timeout.ms",60000*15+""); //使用至少一次语义的形式 //val myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); //使用支持仅一次语义的形式 val myProducer = new FlinkKafkaProducer011[String](topic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); text.addSink(myProducer) env.execute("StreamingKafkaSinkScala") }}
复制代码


Redis 实现 End-to-End Exactly-Once 语义:


代码开发步骤:


  1. 获取流处理执行环境

  2. 设置检查点机制

  3. 定义 kafkaConsumer

  4. 数据转换:分组,求和

  5. 数据写入 redis

  6. 触发执行


object ExactlyRedisSink {  def main(args: Array[String]): Unit = {    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1)    env.enableCheckpointing(5000)    env.setStateBackend(new FsStateBackend("hdfs://node01:8020/check/11"))    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)    env.getCheckpointConfig.setCheckpointTimeout(60000)    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)    //设置kafka,加载kafka数据源    val properties = new Properties()    properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")    properties.setProperty("group.id", "test")    properties.setProperty("enable.auto.commit", "false")    val kafkaConsumer = new FlinkKafkaConsumer011[String]("test2", new SimpleStringSchema(), properties)    kafkaConsumer.setStartFromLatest()    //检查点制作成功,才开始提交偏移量    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)    val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)
//数据转换 val sumData: DataStream[(String, Int)] = kafkaSource.flatMap(_.split(" ")) .map(_ -> 1) .keyBy(0) .sum(1)
val set = new util.HashSet[InetSocketAddress]() set.add(new InetSocketAddress(InetAddress.getByName("node01"),7001)) set.add(new InetSocketAddress(InetAddress.getByName("node01"),7002)) set.add(new InetSocketAddress(InetAddress.getByName("node01"),7003)) val config: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder() .setNodes(set) .setMaxIdle(5) .setMaxTotal(10) .setMinIdle(5) .setTimeout(10) .build()
//数据写入 sumData.addSink(new RedisSink(config,new MyRedisSink)) env.execute() }}class MyRedisSink extends RedisMapper[(String,Int)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"resink") } override def getKeyFromData(data: (String, Int)): String = { data._1 } override def getValueFromData(data: (String, Int)): String = { data._2.toString }}
复制代码


发布于: 17 小时前阅读数: 6
用户头像

InfoQ签约作者 2020.11.10 加入

文章首发于公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Flink 的容错管理详细剖析