写点什么

一文搞懂 Flink 中的锁

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

之前在介绍 flink timer 的时候( 一文搞懂 Flink Timer ) 官网有这样的一句话


Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.


当时觉得特别奇怪,今天我们就一起来看一下,flink 是如何保证 onTimer 与 processElement 同步的以及其他使用 lock 的地方


一文搞定 Flink 消费消息的全流程 我们可以知道,当算子处理 msg 时,保持同步


// 这里就是真正的,用户的代码即将被执行的地方						// now we can do the actual processing						StreamRecord<IN> record = recordOrMark.asRecord();						//处理每条 record lock						// 所以如果是 window 由 processElement 导致的 window fire 也会被 lock 住						synchronized (lock) {							numRecordsIn.inc();							//throught KeySelector set KeyContext setCurrentKey							streamOperator.setKeyContextElement1(record);							//处理数据							streamOperator.processElement(record);						}
复制代码


一文搞懂 flink 处理水印全过程 我们可以知道下游算子处理水印时,会保持同步


public void handleWatermark(Watermark watermark) {			try {				// event time lock				synchronized (lock) {					watermarkGauge.setCurrentWatermark(watermark.getTimestamp());//gauge					//处理 watermark 的入口					operator.processWatermark(watermark);				}			} catch (Exception e) {				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);			}		}
复制代码

这是 event timer 触发的过程, 同理 process timer


@Override		public void run() {			// process timer lock			synchronized (lock) {				try {					if (serviceStatus.get() == STATUS_ALIVE) {						target.onProcessingTime(timestamp);					}				} catch (Throwable t) {					TimerException asyncException = new TimerException(t);					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);				}			}		}
复制代码

其中 lock 均来自于 StreamTask 的


private final Object lock = new Object();
复制代码

另外 lock 除了应用于 ontimer() 与 processElement() 方法外,还应用于

处理水印、处理 record、triggerCheckpoint、kafka 发送 msg、update offset


发布于: 2021 年 03 月 23 日阅读数: 6
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂 Flink 中的锁