写点什么

Flink 和流式应用运维(十 - 下)

发布于: 1 小时前
Flink 和流式应用运维(十-下)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


监控 Flink 集群和应用

监控你的流作业对于确保其健康运行和及早检测错误配置、资源不足或意外行为的潜在故障至关重要。特别是当流作业是面向用户的应用程序中更大的数据处理管道或事件驱动服务的一部分时,你可能希望尽可能精确地监控其性能并确保它满足延迟、吞吐量、资源利用率的某些目标等等。

 

Flink 在运行时收集一组预定义的指标,还提供了一个框架,允许你定义和跟踪自己的指标。

 

Flink Web UI

要获得 Flink 集群的概况以及对作业在内部执行情况的了解,最简单的方法是使用 Flink 的 Web UI。你可以通过以下地址访问访问仪表板:

http://<jobmanager-hostname>:8081

在主屏幕上,你将看到集群配置的概览,包括 TaskManager 的数量、已配置和可用的任务 slot 数量以及正在运行和已完成的作业。 图 10-2 显示了仪表板主屏幕的一个实例。 左侧的菜单链接到有关作业和配置参数的更多详细信息,还允许通过上传 JAR 提交作业。

如果单击正在运行的作业,你可以快速了解每个任务或子任务的运行统计信息,如图 10-3 所示。 你可以检查交换的持续时间、字节和记录,并根据需要汇总每个 TaskManager。

如果单击 Task Metrics 选项卡,则可以从下拉菜单中选择更多指标,如图 10-4 所示。 其中包括有关你的任务的更细粒度的统计信息,例如缓冲区使用情况、

水位线和输入/输出速率。

图 10-5 显示了所选指标如何显示为持续更新的图表。

Checkpoints 选项卡(图 10-3)显示有关先前和当前检查点的统计信息。 在概览下,你可以查看已触发、正在进行、已成功完成或已失败的检查点数量。 如果单击 History 视图,则可以检索更细粒度的信息,例如状态、触发时间、状态大小以及检查点对齐阶段缓冲的字节数。 “摘要”视图聚合检查点统计信息,并提供所有已完成检查点的最小值、最大值和平均值。 最后,在配置下,你可以检查检查点的配置属性,例如设置的间隔和超时值。

 

同样,back pressure 选项卡显示每个算子和子任务的 back pressure 统计信息。 如果单击一行,则会触发 back pressure 采样,并且你将看到消息 Sampling in progress... 大约五秒钟。 采样完成后,你将在第二列中看到 back pressure 状态。 back pressure 任务将显示 HIGH 符号; 否则,你应该会看到一条漂亮的绿色 OK 消息。

 

指标系统

在生产中运行数据处理系统(如 Flink)时,必须监控其行为,以便能够发现和诊断性能下降的原因。 Flink 默认收集多个系统和应用指标。 每个算子、TaskManager 或 JobManager 收集指标。 在这里,我们描述了一些最常用的指标,并请你参阅 Flink 的文档以获取可用指标的完整列表。

 

类别包括 CPU 利用率、使用的内存、活动线程数、垃圾收集统计信息、网络指标(例如排队的输入/输出缓冲区的数量)、集群范围的指标(例如正在运行的作业和可用资源的数量)、作业指标(包括运行时)、 重试次数和检查点信息、I/O 统计信息(包括本地和远程记录交换的次数)、水位线信息、特定于连接器的指标等。

 

注册和使用指标

要注册指标,你必须通过调用 RuntimeContext 上的 getMetrics()方法来得到 MetricGroup,如下面示例所示。

class PositiveFilter extends RichFilterFunction[Int] {

   @transient private var counter: Counter = _

   override def open(parameters: Configuration): Unit = {

       counter = getRuntimeContext

      .getMetricGroup

      .counter("droppedElements")

  }

   override def filter(value: Int): Boolean = {

       if (value > 0) {

      true

      }

       else {

           counter.inc()

           false

  }

  }

}

METRIC GROUPS

Flink 指标是通过 MetricGroup 接口注册和访问的。MetricGroup 提供了创建嵌套的、命名的指标层次结构的方法,并提供了注册以下指标类型的方法:

Counter:

org.apache.flink.metrics.Counter 指标测量计数并提供递增和递减的方法。 你可以使用 MetricGroup 上的 counter(String name, Counter counter) 方法注册计数器指标。

Gauge:

Gauge 指标计算某个时间点的任何类型的值。 要使用 Gauge,你需要实现 org.apache.flink.metrics.Gauge 接口并使用 MetricGroup 上的 Gauge(String name, Gauge Gauge) 方法注册它。 下面例子中的代码展示了 WatermarkGauge 指标的实现,它公开了当前的水位线。

public class WatermarkGauge implements Gauge<Long> {

   private long currentWatermark = Long.MIN_VALUE;

   public void setCurrentWatermark(long watermark) {

  this.currentWatermark = watermark;

  }

   @Override

   public Long getValue() {

  return currentWatermark;

  }

}

 以字符串形式上报指标

度量报告器会将 Gauge 值转换为字符串,因此如果你使用的类型未提供,请确保提供有意义的 toString()实现。

Histogram:

你可以使用直方图来表示数值数据的分布。Flink 的直方图特别适用于报告长值的指标。 org.apache.flink.metrics.Histogram 接口允许你收集值,获取收集值的当前计数,并为目前看到的值创建统计信息,例如最小值、最大值、标准偏差和平均值。

除了创建你自己的直方图实现,Flink 还允许你使用 DropWizard 直方图,通过添加依赖。如下:

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-metrics-dropwizard</artifactId>

   <version>flink-version</version>

</dependency>

以及你可以使用 DropwizardHistogramWrapper 类在 Flink 程序中注册 DropWizard 直方图,如下面示例所示。

// create and register histogram

DropwizardHistogramWrapper histogramWrapper =

   new DropwizardHistogramWrapper(

  new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

  )

metricGroup.histogram("myHistogram", histogramWrapper)

// update histogram

histogramWrapper.update(value)

Meter:

你可以使用 Meter 指标来衡量某些事件发生的速率(以每秒事件数为单位)。 org.apache.flink.metrics.Meter 接口提供了方法来标记一个或多个事件的发生,获取每秒事件的当前速率,以及获取当前标记在仪表上的事件数量。

 

与直方图一样,你可以通过在 pom.xml 中添加 flink-metrics-dropwizard 依赖项并将 meter 包装在 DropwizardMeterWrapper 类中来使用 DropWizard meter。

 

范围和指标格式

Flink 指标属于一个范围,该范围可以是系统范围(系统提供的指标),也可以是用户范围(自定义、用户定义的指标)。 指标由唯一标识符引用,该标识符最多包含三个部分:

1. 用户在注册指标时指定的名称

2. 一个可选的用户范围

3. 一个系统范围

例如,名称“myCounter”、用户范围“MyMetrics”和系统范围“localhost.taskmanager.512”将生成标识符“localhost.taskmanager.512.MyMetrics.myCounter”。 你可以更改默认的“.”。 delimiter 通过设置 metrics.scope.delimiter 配置选项。

 

系统范围声明了该指标所指的系统组件以及它应该包含哪些上下文信息。指标的范围可以是 JobManager、TaskManager、作业、算子或任务。 你可以通过在 flink-conf.yaml 文件中设置相应的度量选项来配置度量应包含哪些上下文信息。 我们在下表中列出了其中一些配置选项及其默认值。


配置 key 包含常量字符串,例如“taskmanager”和尖括号中显示的变量。 后者将在运行时替换为实际值。 例如,TaskManager 指标的默认范围可能会创建范围“localhost.taskmanager.512”,其中“localhost”和“512”是参数值。 下表显示了可用于配置指标范围的所有变量。

每个作业的范围标识符必须是唯一的

如果同时运行同一作业的多个副本,由于字符串冲突,指标可能会变得不准确。为避免此类风险,你应确保每个作业的范围标识符是唯一的。这可以通过包含<job_id> 轻松处理。

你还可以通过调用 MetricGroup 的 addGroup()方法来定义指标的用户范围,如下面示例所示。

counter = getRuntimeContext

.getMetricGroup

.addGroup("MyMetrics")

.counter("myCounter")

 

访问指标

既然你已经学习了如何注册、定义和分组指标,你可能想知道如何从外部系统访问它们。毕竟,你收集指标可能是因为你想要创建实时仪表板或将测量结果提供给另一个应用程序。你可以通过报告器向外部后端公开指标,Flink 为其中几个提供了实现(见下表)。


如果你想使用未包含在上述列表中的指标后端,你还可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来定义你自己的报告程序。

Reporters 需要在 flink-conf.yaml 中进行配置。 将以下行添加到你的配置中将定义一个 JMX 报告器“my_reporter”,它侦听端口 9020-9040:

metrics.reporters: my_reporter

Metrics.reporter.my_jmx_reporter.class:org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.my_jmx_reporter.port: 9020-9040

请参阅 Flink 文档以获取每个受支持报告器的完整配置选项列表。

 

延迟监控

延迟可能是你要监控的首要指标之一,以评估你的流式作业的性能特征。同时,它也是在 Flink 等语义丰富的分布式流引擎中定义最棘手的指标之一。 在“延迟”中,我们将延迟广义地定义为处理事件所需的时间。 你可以想象,如果我们尝试在具有复杂数据流的高速流作业中跟踪每个事件的延迟,那么此定义的精确实现在实践中会如何出现问题。 考虑到窗口算子使延迟跟踪更加复杂,如果一个事件对多个窗口有贡献,我们是否需要报告第一次调用的延迟,还是需要等到我们评估一个事件可能属于的所有窗口? 如果一个窗口多次触发怎么办?

 

Flink 遵循一种简单且低开销的方法来提供有用的延迟指标测量。 它没有尝试严格测量每个事件的延迟,而是通过定期在源处发出特殊记录并允许用户跟踪此记录到达接收器所需的时间来近似延迟。 这个特殊的记录被称为延迟标记,它带有一个时间戳,表明它是什么时候发出的。

 

要启用延迟跟踪,你需要配置从源发出延迟标记的频率。你可以通过在 ExecutionConfig 中设置 delayTrackingInterval 来做到这一点,如下所示:

env.getConfig.setLatencyTrackingInterval(500L)

间隔以毫秒为单位指定。收到延迟标记后,除下沉外的所有运营商都将其向下游转发。延迟标记使用与普通流记录相同的数据流通道和队列,因此它们跟踪的延迟反映了等待处理记录的时间。但是,它们不会测量处理记录所需的时间或记录在处理之前处于等待状态的时间。

 

算子将延迟统计信息保存在包含最小值、最大值和平均值以及 50、95 和 99 个百分位值的延迟量表中。 Sink 算子保留每个并行源实例接收到的延迟标记的统计信息,因此检查接收器的延迟标记可用于估算记录遍历数据流所需的时间。 如果你想在算子处自定义处理延迟标记,你可以覆盖 processLatencyMarker() 方法并使用 LatencyMarker 的方法 getMarkedTime()、getVertexId() 和 getSubTaskIndex() 检索相关信息。

                          注意时钟同步

如果你没有使用自动时钟同步服务(如 NTP),你的机器时钟可能会出现时钟偏差。 在这种情况下,延迟跟踪估计将不可靠,因为其当前实现假设时钟同步。

 

配置日志行为

日志记录是另一个用于调试和理解应用程序行为的重要工具。默认情况下,Flink 使用 SLF4J 日志抽象和 log4j 日志框架。下面示例展示了一个 MapFunction,它记录每个输入记录的转换操作。

import org.apache.flink.api.common.functions.MapFunction

import org.slf4j.LoggerFactory

import org.slf4j.Logger

class MyMapFunction extends MapFunction[Int, String] {

   Logger LOG = LoggerFactory.getLogger(MyMapFunction.class)

   override def map(value: Int): String = {

       LOG.info("Converting value {} to string.", value)

       value.toString

  }

}

 

要更改 log4j 记录器的属性,请修改 conf/文件夹中的 log4j.properties 文件。例如,以下行将根日志级别设置为“warning”:

log4j.rootLogger=WARN

要设置此文件的自定义文件名和位置,请将- Dlog4j.configuration= 参数传递给 JVM。 Flink 还提供了命令行客户端使用的 log4j-cli.properties 文件和命令行客户端在启动 YARN 会话时使用的 log4j-yarn-session.properties 文件。

 

log4j 的替代方案是 logback,Flink 也为此后端提供默认配置文件。 要使用 logback 而不是 log4j,你需要从 lib/ 文件夹中删除 log4j。 有关如何设置和配置后端,请参阅 Flink 的文档和 logback 手册。

 

小结

在本章中,我们讨论了如何在生产中运行、管理和监控 Flink 应用程序。 我们解释了收集和公开系统和应用程序指标的 Flink 组件,如何配置日志系统,以及如何使用命令行客户端和 REST API 启动、停止、恢复和重新调整应用程序。

 

发布于: 1 小时前阅读数: 5
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
Flink 和流式应用运维(十-下)