1. Checkpoint 介绍
checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink 的 checkpoint 机制原理来自“Chandy-Lamport algorithm”算法。
每个需要 checkpoint 的应用在启动时,Flink 的 JobManager 为其创建一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器) 周期性的向该流应用的所有 source 算子发送 barrier(屏障)。
当某个 source 算子收到一个 barrier 时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向 CheckpointCoordinator 报告自己快照制作情况,同时向自身所有下游算子广播该 barrier,恢复数据处理
下游算子收到 barrier 之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向 CheckpointCoordinator 报告自身快照情况,同时向自身所有下游算子广播该 barrier,恢复数据处理。
每个算子按照步骤 3 不断制作快照并向下游广播,直到最后 barrier 传递到 sink 算子,快照制作完成。
当 CheckpointCoordinator 收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败。
如果一个算子有两个输入源,则暂时阻塞先收到 barrier 的输入源,等到第二个输入源相 同编号的 barrier 到来时,再制作自身快照并向下游广播该 barrier。具体如下图所示:
假设算子 C 有 A 和 B 两个输入源
在第 i 个快照周期中,由于某些原因(如处理时延、网络时延等)输入源 A 发出的 barrier 先到来,这时算子 C 暂时将输入源 A 的输入通道阻塞,仅收输入源 B 的数据。
当输入源 B 发出的 barrier 到来时,算子 C 制作自身快照并向 CheckpointCoordinator 报告自身的快照制作情况,然后将两个 barrier 合并为一个,向下游所有的算子广播。
当由于某些原因出现故障时,CheckpointCoordinator 通知流图上所有算子统一恢复到某个周期的 checkpoint 状态,然后恢复数据流处理。分布式 checkpoint 机制保证了数据仅被处理一次(Exactly Once)。
2. 持久化存储
1) MemStateBackend
该持久化存储主要将快照数据保存到 JobManager 的内存中,仅适合作为测试以及快照的数据量非常小时使用,并不推荐用作大规模商业部署。
MemoryStateBackend 的局限性:
默认情况下,每个状态的大小限制为 5 MB。可以在 MemoryStateBackend 的构造函数中增加此值。
无论配置的最大状态大小如何,状态都不能大于 akka 帧的大小(请参阅配置)。
聚合状态必须适合 JobManager 内存。
建议 MemoryStateBackend 用于:
本地开发和调试。
状态很少的作业,例如仅包含一次记录功能的作业(Map,FlatMap,Filter,...),kafka 的消费者需要很少的状态。
2) FsStateBackend
该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS 和本地文件。如果使用 HDFS,则初始化 FsStateBackend 时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点 A 上失败,在节点 B 上恢复,使用本地文件时,在 B 上无法读取节点 A 上的数据,导致状态恢复失败。
建议 FsStateBackend:
具有大状态,长窗口,大键 / 值状态的作业。
所有高可用性设置。
3) RocksDBStateBackend
RocksDBStatBackend 介于本地文件和 HDFS 之间,平时使用 RocksDB 的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend 中(FsStateBackend 不必用户特别指明,只需在初始化时传入 HDFS 或本地路径即可,如 new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或 new RocksDBStateBackend("file:///Data"))。
如果用户使用自定义窗口(window),不推荐用户使用 RocksDBStateBackend。在自定义窗口中,状态以 ListState 的形式保存在 StatBackend 中,如果一个 key 值中有多个 value 值,则 RocksDB 读取该种 ListState 非常缓慢,影响性能。用户可以根据应用的具体情况选择 FsStateBackend+HDFS 或 RocksStateBackend+HDFS。
4) 语法
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 设置checkpoint的执行模式,最多执行一次或者至少执行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置checkpoint的超时时间
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//设置同一时间有多少 个checkpoint可以同时执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
复制代码
5) 修改 State Backend 的两种方式
第一种:单任务调整
修改当前任务代码
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);
【需要添加第三方依赖】
第二种:全局调整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend 的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
6) Checkpoint 的高级选项
默认 checkpoint 功能是 disabled 的,想要使用的时候需要先启用 checkpoint 开启之后,默认的 checkPointMode 是 Exactly-once
//配置一秒钟开启一个checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的执行模式
//两种可选:
//CheckpointingMode.EXACTLY_ONCE:默认值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情况下选择CheckpointingMode.EXACTLY_ONCE,除非场景要求极低的延迟(几毫秒)
注意:如果需要保证EXACTLY_ONCE,source和sink要求必须同时保证EXACTLY_ONCE
复制代码
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默认情况下,检查点不被保留,仅用于在故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意,在这种情况下,您必须在取消后手动清理检查点状态
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被cancel时,删除检查点,检查点仅在作业失败时可用
复制代码
//设置checkpoint超时时间
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超时时间,超时时间内没有完成则被终止
复制代码
//Checkpointing最小时间间隔,用于指定上一个checkpoint完成之后
//最小等多久可以触发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
复制代码
//设置同一个时间是否可以有多个checkpoint执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定运行中的checkpoint最多可以有多少个
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint发生异常的时候,是否应该fail该task,默认是true,如果设置为false,则task会拒绝checkpoint然后继续运行
复制代码
2. Flink 的重启策略
Flink 支持不同的重启策略,这些重启策略控制着 job 失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果 Job 提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。
1) 概览
默认的重启策略是通过 Flink 的 flink-conf.yaml 来指定的,这个配置参数 restart-strategy 定义了哪种策略会被采用。如果 checkpoint 未启动,就会采用 no restart 策略,如果启动了 checkpoint 机制,但是未指定重启策略的话,就会采用 fixed-delay 策略,重试 Integer.MAX_VALUE 次。请参考下面的可用重启策略来了解哪些值是支持的。
每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。
除了定义一个默认的重启策略之外,你还可以为每一个 Job 指定它自己的重启策略,这个重启策略可以在 ExecutionEnvironment 中调用 setRestartStrategy() 方法来程序化地调用,注意这种方式同样适用于 StreamExecutionEnvironment。
下面的例子展示了如何为 Job 设置一个固定延迟重启策略,一旦有失败,系统就会尝试每 10 秒重启一次,重启 3 次。
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
))
复制代码
2) 固定延迟重启策略(Fixed Delay Restart Strategy)
固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
重启策略可以配置 flink-conf.yaml 的下面配置参数来启用,作为默认的重启策略:
restart-strategy: fixed-delay
复制代码
例子:
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
复制代码
固定延迟重启也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
))
复制代码
3) 失败率重启策略
失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
失败率重启策略可以在 flink-conf.yaml 中设置下面的配置参数来启用:
restart-strategy:failure-rate
复制代码
例子:
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
复制代码
失败率重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔
))
复制代码
4) 无重启策略
Job 直接失败,不会尝试进行重启
无重启策略也可以在程序中设置
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
复制代码
5) 案例
需求:输入五次 zhangsan,程序挂掉。
代码:
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
object FixDelayRestartStrategiesDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//如果想要开启重启策略,就必须开启CheckPoint
env.enableCheckpointing(5000L)
//指定状态存储后端,默认就是内存
//现在指定的是FsStateBackend,支持本地系统、
//new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)
env.setStateBackend(new FsStateBackend(args(0)))
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定以后存储多个checkpoint目录
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//指定重启策略,默认的重启策略是不停的重启
//程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))
val lines: DataStream[String] = env.socketTextStream(args(1), 8888)
val result = lines.flatMap(_.split(" ").map(word => {
if(word.equals("zhangsan")) {
throw new RuntimeException("zhangsan,程序重启!");
}
(word, 1)
})).keyBy(0).sum(1)
result.print()
env.execute()
}
}
复制代码
3) checkpoint 案例
1. 需求:
假定用户需要每隔 1 秒钟需要统计 4 秒中窗口中数据的量,然后对统计的结果值进行 checkpoint 处理
2. 数据规划:
使用自定义算子每秒钟产生大约 10000 条数据。
产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。
数据经统计后,统计结果打印到终端输出。
打印输出的结果为 Long 类型的数据 。
3. 开发思路:
source 算子每隔 1 秒钟发送 10000 条数据,并注入到 Window 算子中。
window 算子每隔 1 秒钟统计一次最近 4 秒钟内数据数量。
每隔 1 秒钟将统计结果打印到终端。
每隔 6 秒钟触发一次 checkpoint,然后将 checkpoint 的结果保存到 HDFS 中。
5. 开发步骤:
获取流处理执行环境
设置检查点机制
自定义数据源
数据分组
划分时间窗口
数据聚合
数据打印
触发执行
示例代码:
//发送数据形式
case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{
private var count = 0L
private var isRunning = true
private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
// 任务取消时调用
override def cancel(): Unit = {
isRunning = false
}
//// source算子的逻辑,即:每秒钟向流图中注入10000个元组
override def run(sourceContext: SourceContext[SEvent]): Unit = {
while(isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
}
Thread.sleep(1000)
}
}
}
/**
该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。
*/
object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 应用逻辑
val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk)
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
// 设置watermark
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
// 给每个元组打上时间戳
override def extractTimestamp(t: SEvent, l: Long): Long = {
System.currentTimeMillis()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.apply(new WindowStatisticWithChk)
.print()
env.execute()
}
}
//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。
// 用户自定义状态
class UDFState extends Serializable{
private var count = 0L
// 设置用户自定义状态
def setState(s: Long) = count = s
// 获取用户自定状态
def getState = count
}
//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
private var total = 0L
// window算子的实现逻辑,即:统计window中元组的数量
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
var count = 0L
for (event <- input) {
count += 1L
}
total += count
out.collect(count)
}
// 从自定义快照中恢复状态
override def restoreState(state: util.List[UDFState]): Unit = {
val udfState = state.get(0)
total = udfState.getState
}
// 制作自定义状态快照
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new UDFState
udfState.setState(total)
udfList.add(udfState)
udfList
}
}
复制代码
4. 端对端仅处理一次语义
当谈及仅一次处理时,我们真正想表达的是每条输入消息只会影响最终结果一次!(影响应用状态一次,而非被处理一次)即使出现机器故障或软件崩溃,Flink 也要保证不会有数据被重复处理或压根就没有被处理从而影响状态。
在 Flink 1.4 版本之前,精准一次处理只限于 Flink 应用内,也就是所有的 Operator 完全由 Flink 状态保存并管理的才能实现精确一次处理。但 Flink 处理完数据后大多需要将结果发送到外部系统,比如 Sink 到 Kafka 中,这个过程中 Flink 并不保证精准一次处理。
在 Flink 1.4 版本正式引入了一个里程碑式的功能:两阶段提交 Sink,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,自此 Flink 搭配特定 Source 和 Sink(如 Kafka 0.11 版)实现精确一次处理语义(英文简称:EOS,即 Exactly-Once Semantics)。
在 Flink 中需要端到端精准一次处理的位置有三个:
Source 端:数据从上一阶段进入到 Flink 时,需要保证消息精准一次消费。
Flink 内部端:这个我们已经了解,利用 Checkpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性。不了解的小伙伴可以看下我之前的文章:
Flink 可靠性的基石-checkpoint 机制详细解析
Sink 端:将处理完的数据发送到下一阶段时,需要保证数据能够准确无误发送到下一阶段。
1) Flink 端到端精准一次处理语义(EOS)
以下内容适用于 Flink 1.4 及之后版本
对于 Source 端:Source 端的精准一次处理比较简单,毕竟数据是落到 Flink 中,所以 Flink 只需要保存消费数据的偏移量即可, 如消费 Kafka 中的数据,Flink 将 Kafka Consumer 作为 Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。
对于 Sink 端:Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以精准一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用(Kafka 0.11 版本已经实现精确一次处理语义)。
我们以 Flink 与 Kafka 组合为例,Flink 从 Kafka 中读数据,处理完的数据在写入 Kafka 中。
为什么以 Kafka 为例,第一个原因是目前大多数的 Flink 系统读写数据都是与 Kafka 系统进行的。第二个原因,也是最重要的原因 Kafka 0.11 版本正式发布了对于事务的支持,这是与 Kafka 交互的 Flink 应用要实现端到端精准一次语义的必要条件。
当然,Flink 支持这种精准一次处理语义并不只是限于与 Kafka 的结合,可以使用任何 Source/Sink,只要它们提供了必要的协调机制。
2) Flink 与 Kafka 组合
如上图所示,Flink 中包含以下组件:
一个 Source,从 Kafka 中读取数据(即 KafkaConsumer)
一个时间窗口化的聚会操作(Window)
一个 Sink,将结果写入到 Kafka(即 KafkaProducer)
若要 Sink 支持精准一次处理语义(EOS),它必须以事务的方式写数据到 Kafka,这样当提交事务时两次 Checkpoint 间的所有写入操作当作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。
当然了,在一个分布式且含有多个并发执行 Sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一个一致性的结果。Flink 使用两阶段提交协议以及预提交(Pre-commit)阶段来解决这个问题。
3) 两阶段提交协议(2PC)
两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A (原子性)。
在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性。
两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。
顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)。
两阶段提交协议过程如下图所示:
第一阶段:表决阶段
协调者向所有参与者发送一个 VOTE_REQUEST 消息。
当参与者接收到 VOTE_REQUEST 消息,向协调者发送 VOTE_COMMIT 消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个 VOTE_ABORT 消息,告诉协调者目前无法提交事务。
第二阶段:提交阶段
协调者收集来自各个参与者的表决消息。如果所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个 GLOBAL_COMMIT 消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务,向所有参与者广播一条 GLOBAL_ABORT 消息通知所有的参与者取消事务。
每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个 GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务。
4) 两阶段提交协议在 Flink 中的应用
Flink 的两阶段提交思路:
我们从 Flink 程序启动到消费 Kafka 数据,最后到 Flink 将数据 Sink 到 Kafka 为止,来分析 Flink 的精准一次处理。
当 Checkpoint 启动时,JobManager 会将检查点分界线(checkpoint battier)注入数据流,checkpoint barrier 会在算子间传递下去,如下如所示:
Source 端:Flink Kafka Source 负责保存 Kafka 消费 offset,当 Chckpoint 成功时 Flink 负责提交这些写入,否则就终止取消掉它们,当 Chckpoint 完成位移保存,它会将 checkpoint barrier(检查点分界线) 传给下一个 Operator,然后每个算子会对当前的状态做个快照,保存到状态后端(State Backend)。
对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据,如下图所示:
Slink 端:从 Source 端开始,每个内部的 transform 任务遇到 checkpoint barrier(检查点分界线)时,都会把状态存到 Checkpoint 里。数据处理完毕到 Sink 端时,Sink 任务首先把数据写入外部 Kafka,这些数据都属于预提交的事务(还不能被消费),此时的 Pre-commit 预提交阶段下 Data Sink 在保存状态到状态后端的同时还必须预提交它的外部事务,如下图所示:
当所有算子任务的快照完成(所有创建的快照都被视为是 Checkpoint 的一部分),也就是这次的 Checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 Checkpoint 完成,此时 Pre-commit 预提交阶段才算完成。才正式到两阶段提交协议的第二个阶段:commit 阶段。该阶段中 JobManager 会为应用中每个 Operator 发起 Checkpoint 已完成的回调逻辑。
本例中的 Data Source 和窗口操作无外部状态,因此在该阶段,这两个 Opeartor 无需执行任何逻辑,但是 Data Sink 是有外部状态的,此时我们必须提交外部事务,当 Sink 任务收到确认通知,就会正式提交之前的事务,Kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了,如下图所示:
注:Flink 由 JobManager 协调各个 TaskManager 进行 Checkpoint 存储,Checkpoint 保存在 StateBackend(状态后端) 中,默认 StateBackend 是内存级的,也可以改为文件级的进行持久化保存。
最后,一张图总结下 Flink 的 EOS:
此图建议保存,总结全面且简明扼要,再也不怂面试官!
5) Exactly-Once 案例
Kafka 来实现 End-to-End Exactly-Once 语义:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
/**
* Kafka Producer的容错-Kafka 0.9 and 0.10
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数
* •setLogFailuresOnly(false)
* •setFlushOnCheckpoint(true)
*
* 注意:建议修改kafka 生产者的重试次数
* retries【这个参数的值默认是0】
*
* Kafka Producer的容错-Kafka 0.11
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义
* 但是需要选择具体的语义
* •Semantic.NONE
* •Semantic.AT_LEAST_ONCE【默认】
* •Semantic.EXACTLY_ONCE
*/
object StreamingKafkaSinkScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
//checkpoint配置
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val text = env.socketTextStream("node01", 9001, '\n')
val topic = "test"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092")
//设置事务超时时间,也可在kafka配置中设置
prop.setProperty("transaction.timeout.ms",60000*15+"");
//使用至少一次语义的形式
//val myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用支持仅一次语义的形式
val myProducer =
new FlinkKafkaProducer011[String](topic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.addSink(myProducer)
env.execute("StreamingKafkaSinkScala")
}
}
复制代码
Redis 实现 End-to-End Exactly-Once 语义:
代码开发步骤:
获取流处理执行环境
设置检查点机制
定义 kafkaConsumer
数据转换:分组,求和
数据写入 redis
触发执行
object ExactlyRedisSink {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://node01:8020/check/11"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//设置kafka,加载kafka数据源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
properties.setProperty("group.id", "test")
properties.setProperty("enable.auto.commit", "false")
val kafkaConsumer = new FlinkKafkaConsumer011[String]("test2", new SimpleStringSchema(), properties)
kafkaConsumer.setStartFromLatest()
//检查点制作成功,才开始提交偏移量
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)
//数据转换
val sumData: DataStream[(String, Int)] = kafkaSource.flatMap(_.split(" "))
.map(_ -> 1)
.keyBy(0)
.sum(1)
val set = new util.HashSet[InetSocketAddress]()
set.add(new InetSocketAddress(InetAddress.getByName("node01"),7001))
set.add(new InetSocketAddress(InetAddress.getByName("node01"),7002))
set.add(new InetSocketAddress(InetAddress.getByName("node01"),7003))
val config: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
.setNodes(set)
.setMaxIdle(5)
.setMaxTotal(10)
.setMinIdle(5)
.setTimeout(10)
.build()
//数据写入
sumData.addSink(new RedisSink(config,new MyRedisSink))
env.execute()
}
}
class MyRedisSink extends RedisMapper[(String,Int)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"resink")
}
override def getKeyFromData(data: (String, Int)): String = {
data._1
}
override def getValueFromData(data: (String, Int)): String = {
data._2.toString
}
}
复制代码
评论