写点什么

大数据培训:Flink 全链路延迟的测量方式及原理

  • 2022 年 2 月 16 日
  • 本文字数:3010 字

    阅读完需:约 10 分钟

一、背景

Flink Job 端到端延迟是一个重要的指标,用来衡量 Flink 任务的整体性能和响应延迟(大部分流式应用,要求低延迟特性)。

通过流处理引擎竞品对比,我们发现大部分流计算引擎产品,都在告警监控页面,集成了全链路时延指标展示(直方图)。

一些低延时的处理场景,例如用于登陆、用户下单规则检测,实时预测场景,需要一个可度量的 Metric 指标,来实时观测、监控集群全链路时延情况。

二、源码分析来源

1、本文的源码分析基于 Flink 社区 issue FLINK-3660,以及 issue 对应的 pr 源码 pull-2386,另外,个人也新增了实现源码的说明。

2、其 pr 源码中只涉及到了部分全链路时延实现代码,因此,我在文章中总结了:

  • Source 到 Sink 处理 Latency Marker 源码

  • LatencyMarksEmitter 提交时延标记类

  • LatencyStats(时延直方图 Metric 实现)源码

时延测量–整体架构图

三、腾讯 Oceanus 监控指标参考

如下图,红色框线对应的数据延时,即我们描述的指标


四、Flink LatencyMarker 实现思路

  1. 实现方案变迁

在 webinterface 中,加入流式 job 的端到端延迟是一个重要特性。因此,Flink 社区最初的想法是在每个记录的 source 上附加一个摄取时间(ingestion-time)时间戳。

然而,这为不使用 monitor feature(监控功能)的用户,大数据培训带来了额外开销(每个元素+每个元素上的 System.currentTimeMilis()需要 8 个字节)。

因此,Flink 社区最后决定,通过定期发送特殊事件来实现此功能,类似于通过拓扑发送水印 watermark。

  1. 实现原理

这些特殊事件(LatencyMarker)在 source 上以可配置发送间隔,并由任务 Task 转发。Sink 最后接收到 LatencyMarks 后,将比较 LatencyMarker 的时间戳与当前系统时间,以确定延迟。

LatencyMarker 不会增加作业的延迟,但是 LatencyMarker 与常规记录类似,可以被 delay 阻塞(例如反压情况),因此 LatencyMarker 的延迟与 Record 延迟近似。

  1. 节点间时钟偏移及准确性

当前方案期望所有任务管理器 TaskManager 上的时钟是同步的。否则,测量的延迟也包括 TaskManager 时钟之间的偏移。

后续,我们可以尝试通过使用 JobManager 作为计时服务中心(central timing service)来缓解这个问题。taskmanager 将定期查询 JM 的当前时间,以确定其时钟的偏移量。

这个偏移量仍然包括 TM 和 JM 之间的网络延迟,但是仍然比较好的测量时延。

五、Flink LatencyMarker 实现源码

本章节对应到 pr 源码 pull-2386 的实现,这里简要说明。



  1. 实现基础类及下发标记

Flink 源码中,引入了一个新的 StreamElement,称为 LatencyMarker。

与水印类似,LatencyMarker 按配置的间隔从源发出。这个时间间隔的默认值是 0 毫秒,即不触发(配置项在 ExecutionConfig#latencyTrackingInterval,名称 metrics.latency.interval),例如可以配置成 2000 毫秒触发一次 LatencyMarker 发送。

LatencyMarker 不能“多于”常规元素。这确保了测量的延迟接近于常规流元素的端到端延迟。

常规操作符 Operator(不包括那些参与迭代的 Operator)如果不是 sink,就会转发延迟标记 LatencyMarker。

  1. 多输出通道—随机下发标记

具有多个输出 channel 的 Operator,随机选择一个 channel 通道,将 LatencyMarker 发送给它。这可以确保每个 LatencyMarker 标记在系统中只存在一次,并且重新分区步骤不会导致传输的 LatencyMarker 数量激增。

public class RecordWriterOutput{

@Override

public void emitLatencyMarker(LatencyMarker latencyMarker) {

serializationDelegate.setInstance(latencyMarker);

try {

// 内部实现了随机选择通道

recordWriter.randomEmit(serializationDelegate);

}

catch (Exception e) {

throw new RuntimeException(e.getMessage(), e);

}

}

}

上述 RecordWriterOutput#emitLatencyMarker()会被 StreamSource、AbstractStreamOperator 调用,分别实现 source 和中间 operator 的延迟标记下发。

如果操作符 Operator 是 Sink,它将维护每个已知 source 实例的最后 128 个 LatencyMarker 信息。

  1. Metric 展示

每个已知 source 的最小/最大/平均值/p50/p95/p99 时延,在 sink 的 LatencyStats 对象中,进行汇总(如果没有任何输出的 Operator,就是是 sink)。

本 pr 只涉及全链路延迟统计的实现,Flink 已有一整套 Metric 显示体系,全链路时延 Metric 展示交给 Flink 框架本身)。

此外,目前还没有确保系统时钟同步的机制,因此如果硬件时钟不正确,则延迟测量将不准确。

六、时延粒度 Granularity 说明

  1. 时延粒度–概念说明

任意一个中间 Operator 或 Sink,可以通过配置 metrics.latency.granularity 项,调整与 Source 间统计的粒度(Singe、Operator、Subtask):

A、统计的时候,可以选择 source 源 id、source 源 subtask index 进行组合,调整统计粒度。

B、统计的时候,当前 Operator 及当前 Operator subtask index 总是参与粒度名称的生成,固定的。

  1. 三种时延跟踪策略及其源码定义

Single - 跟踪延迟,无需区分:源+源子任务:

(例如双流 Join 的两个 source,这里都默认为一个数据源了)

SINGLE {

String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {

// 只有自己的 operatorId 和 operatorSubtaskIndex 参与 Metric 名称生成

// LatencyMarker 带有的 id(源)不参与 Metric 名称生成

return String.valueOf(operatorId) + operatorSubtaskIndex;

}

}

Operator - 跟踪延迟,区分源,但不区分源的子任务:

OPERATOR {

String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {

// LatencyMarker 带有的 id(源)中 id 参与计算

return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;

}

}

Subtask - 跟踪延迟,区分源+源子任务:

SUBTASK {

String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {

return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;

}

}

根据上述不同的名称 key,将直方图对象放入 Map 中,Map 定义:

Map<String, DescriptiveStatisticsHistogram> latencyStats = new HashMap<>()

伪代码(创建直方图):

latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);

this.latencyStats.put(uniqueName, latencyHistogram);

伪代码(更新直方图):

long now = System.currentTimeMillis();

latencyHistogram.update(now - marker.getMarkedTime())

  1. Single、Operator、Subtask 时延策略在 Web Metric 中的体现

上述 Single、Operator 、Subtask 不同测试,生成的 Metric 名称和 group 就会产生变化,Web Metric 中名称相应改变

一个 Subtask 时延粒度的 Metric 路径:

Job_<source_id><source_subtask_index><operator_id>_<operator_subtask_index> .latency

七、总结说明

  1. LatencyMarker 不参与 window、MiniBatch 的缓存计时,直接被中间 Operator 下发。

  2. Metric 路径:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency(根据时延配置粒度 Granularity,路径会有变化,参考本文第六章节)

  3. 每个中间 Operator、以及 Sink 都会统计自己与 Source 节点的链路延迟,我们在监控页面,一般展示 Source 至 Sink 链路延迟。

  4. 延迟粒度细分到 Task,可以用来排查哪台机器的 Task 时延偏高,进行对比和运维排查。

  5. 从实现原理来看,发送时延标记间隔配置大一些(例如 20 秒一次),一般不会影响系统处理业务数据的性能(所有的 StreamSource Task 都按间隔发送时延标记,中间节点有多个输出通道的,随机选择一个通道下发,不会复制多份数据出来)。

文章来源于大数据技术与架构

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据培训:Flink全链路延迟的测量方式及原理