Flink 源码分析之写给大忙人看的 Flink Window 原理
Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。
window 分类
Tumbling Window
Sliding Window
Session Window
Global Window
window operator
evictor
evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 两个方法。
trigger
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
onElement() 每次往 window 增加一个元素的时候都会触发
onEventTime() 当 event-time timer 被触发的时候会调用
onProcessingTime() 当 processing-time timer 被触发的时候会调用
onMerge() 对两个 trigger 的 state 进行 merge 操作
clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:
CONTINUE 不做任何事情
FIRE 触发 window
PURGE 清空整个 window 的元素并销毁窗口
FIREANDPURGE 触发窗口,然后销毁窗口
window code
package org.apache.flink.streaming.connectors.kafka;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.StateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.slf4j.LoggerFactory;import java.util.Properties;/** * @author shengjk1 * @date 2019/9/4 */public class Main { protected final static org.slf4j.Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(5); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setFailOnCheckpointingErrors(false); env.setParallelism(1); StateBackend backend = new RocksDBStateBackend("file:////Users/iss/sourceCode/spark/flink/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/checkpoints", true); env.setStateBackend(backend); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "bigdata-dev-mq:9092"); properties.setProperty("group.id", "test"); properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "1000"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); env.addSource(consumer).uid("orderAndRegisterUserIdSource") .rebalance() .keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }) .timeWindow(Time.seconds(2)) .trigger(new CountAndTimeTrigger(2L) .process(new ProcessWindowFunctionImp()).uid("process"); // execute program env.execute("realTimeDataWareHouse"); }}
其中的 CountAndTimeTrigger 可参考 Flink 自定义触发器实现带超时时间的 countAndTimeTrigger
window 原理剖析
首先,当此程序开始消费消息时( 可参考 一文搞定 Flink 消费消息的全流程) 进入 WindowOperator processElement 方法
// window operator 的 processElement public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { // RockdbListState RocksDBReducingState ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); //数据过来之后会先存入 windowState 直至 window fire windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; //调用用户定义的 onElement 代码 TriggerResult triggerResult = triggerContext.onElement(element); //当触发窗口时,从 windowState 中获取数据,在本样例中 windowState 为 RocksDBListState if (triggerResult.isFire()) { //RocksDBListState RocksDBReducingState // ACC contents = windowState.get(); if (contents == null) { continue; } //当窗口触发时,会将 window 中数据发送到下游,调用用户的 process 方法。 emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } // 注册 timer,其实就是定时调度任务。底层通过 ScheduledThreadPoolExecutor.schedule(...)来实现的 // 每个窗口中的每个 key 会有且仅有一个 timer( 判断方式的一部分是通过 map 来实现的) registerCleanupTimer(window); } }
关于 window 消息顺序性问题,可以参考 一文搞懂 Flink window 元素的顺序问题
当注册的 timer 到期之后开始调用 onProcessingTime
// 这个是通过 timer 来调用的, // processElement 的时候 registerCleanupTimer(window) 会创建相应的 timer public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { windowState.setCurrentNamespace(stateWindow); } } else { windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } if (triggerResult.isPurge()) { windowState.clear(); } if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { // 会清空所有的 state // 先 windowState.clear() 调用用户定义的 clear 方法,然后再清除 windowContext 内部的状态: // 仅仅是通过 onProcessingTime or onEventTime method fire window 才可能会触发 clearAllState 操作 // 否则会可以理解为还是一个窗口虽然 fire 了。 // 先增量增量的 fire 然后再全量的 fire ( onProcessingTime and onEventTime 导致的 fire ,未指定 purge) clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }
需要注意的是 window 跟 key 有关
总结
整个 window 流程
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/5cd0e7ecd7761427d646526d3】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
shengjk1
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789
评论