写点什么

深入解析 Flink 的算子链机制

用户头像
Apache Flink
关注
发布于: 2020 年 11 月 12 日
深入解析 Flink 的算子链机制

“为什么我的 Flink 作业 Web UI 中只显示出了一个框,并且 Records Sent 和 Records Received 指标都是 0 ?是我的程序写得有问题吗?”


Flink 算子链简介


笔者在 Flink 社区群里经常能看到类似这样的疑问。这种情况几乎都不是程序有问题,而是因为 Flink 的 operator chain ——即算子链机制导致的,即提交的作业的执行计划中,所有算子的并发实例(即 sub-task )都因为满足特定条件而串成了整体来执行,自然就观察不到算子之间的数据流量了。当然上述是一种特殊情况。我们更常见到的是只有部分算子得到了算子链机制的优化,如官方文档中出现过多次的下图所示,注意 Source 和 map() 算子。



算子链机制的好处是显而易见的:所有 chain 在一起的 sub-task 都会在同一个线程(即 TaskManager 的 slot)中执行,能够减少不必要的数据交换、序列化和上下文切换,从而提高作业的执行效率。



铺垫了这么多,接下来就通过源码简单看看算子链产生的条件,以及它是如何在 Flink Runtime 中实现的。


逻辑计划中的算子链


对 Flink Runtime 稍有了解的看官应该知道,Flink 作业的执行计划会用三层图结构来表示,即:


  • StreamGraph —— 原始逻辑执行计划

  • JobGraph —— 优化的逻辑执行计划(Web UI 中看到的就是这个)

  • ExecutionGraph —— 物理执行计划


算子链是在优化逻辑计划时加入的,也就是由 StreamGraph 生成 JobGraph 的过程中。那么我们来到负责生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 类,查看其核心方法 createJobGraph() 的源码。


private JobGraph createJobGraph() {    // make sure that all vertices start immediately    jobGraph.setScheduleMode(streamGraph.getScheduleMode());    // Generate deterministic hashes for the nodes in order to identify them across    // submission iff they didn't change.    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);    // Generate legacy version hashes for backwards compatibility    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));    }    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();    setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges(); // 略......
return jobGraph;}
复制代码


可见,该方法会先计算出 StreamGraph 中各个节点的哈希码作为唯一标识,并创建一个空的 Map 结构保存即将被链在一起的算子的哈希码,然后调用 setChaining() 方法,如下源码所示。


private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {        createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);    }}
复制代码


可见是逐个遍历 StreamGraph 中的 Source 节点,并调用 createChain() 方法。createChain() 是逻辑计划层创建算子链的核心方法,完整源码如下,有点长。


private List<StreamEdge> createChain(        Integer startNodeId,        Integer currentNodeId,        Map<Integer, byte[]> hashes,        List<Map<Integer, byte[]>> legacyHashes,        int chainIndex,        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {    if (!builtVertices.contains(startNodeId)) {        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } }
for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); }
for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); }
List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId); OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); }
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); } if (currentNode.getOutputFormat() != null) { getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); }
StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) { config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); } config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>()); config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); }
config.setOperatorID(currentOperatorId); if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); }}
复制代码


先解释一下方法开头创建的 3 个 List 结构:


  • transitiveOutEdges:当前算子链在 JobGraph 中的出边列表,同时也是 createChain() 方法的最终返回值;

  • chainableOutputs:当前能够链在一起的 StreamGraph 边列表;

  • nonChainableOutputs:当前不能够链在一起的 StreamGraph 边列表。


接下来,从 Source 开始遍历 StreamGraph 中当前节点的所有出边,调用 isChainable() 方法判断是否可以被链在一起(这个判断逻辑稍后会讲到)。可以链接的出边被放入 chainableOutputs 列表,否则放入 nonChainableOutputs 列表。对于 chainableOutputs 中的边,就会以这些边的直接下游为起点,继续递归调用 createChain() 方法延展算子链。对于 nonChainableOutputs 中的边,由于当前算子链的延展已经到头,就会以这些“断点”为起点,继续递归调用 createChain() 方法试图创建新的算子链。也就是说,逻辑计划中整个创建算子链的过程都是递归的,亦即实际返回时,是从 Sink 端开始返回的。然后要判断当前节点是不是算子链的起始节点。如果是,则调用 createJobVertex()方法为算子链创建一个 JobVertex( 即 JobGraph 中的节点),也就形成了我们在 Web UI 中看到的 JobGraph 效果:



最后,还需要将各个节点的算子链数据写入各自的 StreamConfig 中,算子链的起始节点要额外保存下 transitiveOutEdges。StreamConfig 在后文的物理执行阶段会再次用到。


形成算子链的条件


来看看 isChainable() 方法的代码。 


public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getShuffleMode() != ShuffleMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled();}
复制代码


由此可得,上下游算子能够 chain 在一起的条件还是非常苛刻的(老生常谈了),列举如下:


  • 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);

  • 下游算子的链接策略(ChainingStrategy)为 ALWAYS ——既可以与上游链接,也可以与下游链接。我们常见的 map()、filter() 等都属此类;

  • 上游算子的链接策略为 HEAD 或 ALWAYS。HEAD 策略表示只能与下游链接,这在正常情况下是 Source 算子的专属;

  • 两个算子间的物理分区逻辑是 ForwardPartitioner ,可参见之前写过的《聊聊 Flink DataStream 的八种物理分区逻辑》;

  • 两个算子间的 shuffle 方式不是批处理模式;

  • 上下游算子实例的并行度相同;

  • 没有禁用算子链。


禁用算子链


用户可以在一个算子上调用 startNewChain() 方法强制开始一个新的算子链,或者调用 disableOperatorChaining() 方法指定它不参与算子链。代码位于 SingleOutputStreamOperator 类中,都是通过改变算子的链接策略实现的。


@PublicEvolvingpublic SingleOutputStreamOperator<T> disableChaining() {    return setChainingStrategy(ChainingStrategy.NEVER);}
@PublicEvolvingpublic SingleOutputStreamOperator<T> startNewChain() { return setChainingStrategy(ChainingStrategy.HEAD);}
复制代码


如果要在整个运行时环境中禁用算子链,调用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可。


物理计划中的算子链


在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后,会生成调度执行的基本任务单元 ——StreamTask,负责执行具体的 StreamOperator 逻辑。在 StreamTask.invoke() 方法中,初始化了状态后端、checkpoint 存储和定时器服务之后,可以发现:


operatorChain = new OperatorChain<>(this, recordWriters);headOperator = operatorChain.getHeadOperator();
复制代码


构造出了一个 OperatorChain 实例,这就是算子链在实际执行时的形态。解释一下 OperatorChain 中的几个主要属性。


private final StreamOperator<?>[] allOperators;private final RecordWriterOutput<?>[] streamOutputs;private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;private final OP headOperator;
复制代码


  • headOperator:算子链的第一个算子,对应 JobGraph 中的算子链起始节点;

  • allOperators:算子链中的所有算子,倒序排列,即 headOperator 位于该数组的末尾;

  • streamOutputs:算子链的输出,可以有多个;

  • chainEntryPoint:算子链的“入口点”,它的含义将在后文说明。


由上可知,所有 StreamTask 都会创建 OperatorChain。如果一个算子无法进入算子链,也会形成一个只有 headOperator 的单个算子的 OperatorChain。OperatorChain 构造方法中的核心代码如下。


for (int i = 0; i < outEdgesInOrder.size(); i++) {    StreamEdge outEdge = outEdgesInOrder.get(i);    RecordWriterOutput<?> streamOutput = createStreamOutput(        recordWriters.get(i),        outEdge,        chainedConfigs.get(outEdge.getSourceId()),        containingTask.getEnvironment());    this.streamOutputs[i] = streamOutput;    streamOutputMap.put(outEdge, streamOutput);}
// we create the chain of operators and grab the collector that leads into the chainList<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());this.chainEntryPoint = createOutputCollector( containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
if (operatorFactory != null) { WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint(); headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());} else { headOperator = null;}
// add head operator to end of chainallOps.add(headOperator);this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
复制代码


首先会遍历算子链整体的所有出边,并调用 createStreamOutput() 方法创建对应的下游输出 RecordWriterOutput。然后就会调用 createOutputCollector() 方法创建物理的算子链,并返回 chainEntryPoint,这个方法比较重要,部分代码如下。


private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(        StreamTask<?, ?> containingTask,        StreamConfig operatorConfig,        Map<Integer, StreamConfig> chainedConfigs,        ClassLoader userCodeClassloader,        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,        List<StreamOperator<?>> allOperators) {    List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge); allOutputs.add(new Tuple2<>(output, outputEdge)); }
// Create collectors for the chained outputs for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) { int outputId = outputEdge.getTargetId(); StreamConfig chainedOpConfig = chainedConfigs.get(outputId); WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator( containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag()); allOutputs.add(new Tuple2<>(output, outputEdge)); } // 以下略......}
复制代码


该方法从上一节提到的 StreamConfig 中分别取出出边和链接边的数据,并创建各自的 Output。出边的 Output 就是将数据发往算子链之外下游的 RecordWriterOutput,而链接边的输出要靠 createChainedOperator() 方法。


private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(        StreamTask<?, ?> containingTask,        StreamConfig operatorConfig,        Map<Integer, StreamConfig> chainedConfigs,        ClassLoader userCodeClassloader,        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,        List<StreamOperator<?>> allOperators,        OutputTag<IN> outputTag) {    // create the output that the operator writes to first. this may recursively create more operators    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(        containingTask,        operatorConfig,        chainedConfigs,        userCodeClassloader,        streamOutputs,        allOperators);
// now create the operator and give it the output collector to write its output to StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader); OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator( containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput; if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); }
// wrap watermark gauges since registered metrics must be unique chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue); chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue); return currentOperatorOutput;}
复制代码


我们一眼就可以看到,这个方法递归调用了上述 createOutputCollector() 方法,与逻辑计划阶段类似,通过不断延伸 Output 来产生 chainedOperator(即算子链中除了 headOperator 之外的算子),并逆序返回,这也是 allOperators 数组中的算子顺序为倒序的原因。


chainedOperator 产生之后,将它们通过 ChainingOutput 连接起来,形成如下图所示的结构。



图片来自:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/


最后来看看 ChainingOutput.collect() 方法是如何输出数据流的。


@Overridepublic void collect(StreamRecord<T> record) {    if (this.outputTag != null) {        // we are only responsible for emitting to the main input        return;    }    pushToOperator(record);}
@Overridepublic <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. return; } pushToOperator(record);}
protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); operator.setKeyContextElement1(castRecord); operator.processElement(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); }}
复制代码


可见是通过调用链接算子的 processElement() 方法,直接将数据推给下游处理了。也就是说,OperatorChain 完全可以看做一个由 headOperator 和 streamOutputs 组成的单个算子,其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同时没有引入任何 overhead。打通了算子链在执行层的逻辑,看官应该会明白 chainEntryPoint 的含义了。由于它位于递归返回的终点,所以它就是流入算子链的起始 Output,即上图中指向 headOperator 的 RecordWriterOutput。


文章转载自简书,作者:LittleMagic。

原文链接:https://www.jianshu.com/p/799744e347c7


用户头像

Apache Flink

关注

Apache Flink 中文社区 2020.04.29 加入

公众号:Flink中文社区 Apache Flink 官方帐号,Flink PMC 维护

评论

发布
暂无评论
深入解析 Flink 的算子链机制