写点什么

浅谈 Apache Flume 数据接入的实现原理以及问题分析处理方式

  • 2022-12-05
    江苏
  • 本文字数:3281 字

    阅读完需:约 11 分钟

导读:本文介绍了 Apache Flume 的基本概念、应用在日志实时采集场景中的业务流程,以及几个实际业务问题的分析和处理过程,供大家参考。

背景

某业务集群负责 DPI 日志实时采集,采用 Flume 组件同日志上报网关进行对接。该业务采集流程如下:上报网关将文件进行 GZip 压缩,然后通过 Avro Rpc 等方式传输到接口机集群节点,接口机集群收到数据后对数据做响应处理并将处理结果写入下游,进而完成数据的采集行程卡流程。

整个数据采集过程,为了保证数据安全,遵循敏感数据不落地原则,采集程序对于数据进行了一系列的业务逻辑处理,包括数据解压缩、数据标准化、数据加密等。Flume 在其中产生的作用是对日志进行采集聚合。

整个业务流程主要涉及 A 侧、网关、数据采集、下游。其中 A 侧负责数据接入规范制定以及数据发送,网关侧负责数据转发,我们需要负责数据处理,下游通过 Spark Streaming 进行下一步处理。



数据流向图

概念介绍

Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(比如 Kafka、HDFS、HBase、Pulsar 等)的能力。

Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。其中可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。

下面介绍下 Agent 中几个重要的概念:Source、Channel、Sink。

  • Source:数据的接入端,采集侧,主要用于数据的获取,常见的数据源有 http、Socket、文件、Kafka 等数据源数据。可以自己实现一个 Netty 用于采集客户要求的数据。

  • Channel:传输通道,用于缓存从 Source 到 Sink 的数据。常用的 Memory Channel、Kafka Channel 以及 File Channel。

  • Sink:数据输出端,用于向下一次存储系统传递数据也可以用于传递数据,常见的如 HDFS、Kafka、HBase、Pulsar 等,可以自定义实现。

如下是当前数据处理的一个流程图:



Flume 构成图

从图中不难看出,一旦下游 sink 阻塞,整个流程便会出现问题。下面一起来看下 Agent 的内部原理:

1.Source 接收出入的数据;

2.Channel 处理传入的事件,由于要考虑后续后续数据流出以及做数据过滤,故增加一个拦截器出初步过滤

3.拦截器根据分隔符、列数、用户自定义函数等信息将对应数据进行过了后发送给 Selector

4.Selector 根据后续流程将数据进行复制或者复用

5.Selector 进一步返回数据给 Channel

6.Channel 根据上一步处理结果将 Event 写入不同的 Channel

7.Sink 处理器根据 sink 指定的 src 获取数据并做进一步处理。

主要处理逻辑可总结为如下流程:



为更好展示上述流程,来看一个简单的 agent 配置信息:

1.定义一个 agent,指定其涉及到的 source/channel/sink。示例中 agent 的 source 仅包含一个,可指定多个,通过空格隔开;sink 包含 4 个,2 个用于写 hdfs,2 个用于写 Kafka

2.指定 source 写入的 channel 以及 source 类型

3.指定 source 写数据的类型为复制以及复制方式是通过 header

4.指定 channel 的 ch1 的类型为内存类型以及事务大小

5.指定 sink 对应的不同 channel 以及其写入的集群,此处指定 ch1 写 HDFS 集群,ch2 写如 Kafka 集群。

#指定agent涉及的三个组件agent.sources = src1agent.sinks = sinkhdfs1 sinkhdfs2 sinkkafka1 sinkkafka2agent.channels = ch1 ch2#指定src1对应的channelagent.sources.src1.channels=ch1agent.sources.src1.type=com.cmss.chinamobile.flume.sdtp.SDTPSourceagent.sources.src1.bind=100.71.9.12agent.sources.src1.port=9100#指定src1的选择器agent.sources.src1.selector.type=multiplexingagent.sources.src1.selector.header=targetagent.sources.src1.selector.mapping.ch1=ch1agent.sources.src1.selector.mapping.ch2=ch2#指定ch1类型agent.channels.ch1.type=com.cmss.chinamobile.flume.channel.MemoryChannelagent.channels.ch1.keep-alive=24agent.channels.ch1.capacity=20000agent.channels.ch1.transactionCapacity=10000#指定sink类型以及其对应的channelagent.sinks.sinkkafka1.type=com.cmss.chinamobile.flume.kafka.KafkaSinkagent.sinks.sinkkafka1.kafka.bootstrap.servers=kafka1:6667agent.sinks.sinkkafka1.channel=ch2agent.sinks.sinkhdfs1.type=com.cmss.chinamobile.flume.hdfs.HDFSEventSinkagent.sinks.sinkhdfs1.hdfs.path=hdfs://ns1/tmpagent.sinks.sinkhdfs1.channel=ch1
复制代码


问题定位

问题 1:HDFS 异常

接入侧积压,手机短信收到 ch2 满告警,同步登录监控页面,查看 channel 满:



业务出现卡顿,排查 Flume 的 jstack 信息发现有一个 create 的 rpc 请求(lock 对象 0x000000079ee2b620)时间 从 6-11_20:02:44 到 6-11_20:03:00 持续约 16 秒 怀疑 HDFS 的 rpc 请求慢:



此时同步查看 HDFS 对应时间点的 RpcQueueTime_avg_time 监控发现 ns1 的 namenode 在 20:01 和 02 分有 90 多毫秒的峰值,超过平均值:



查看 HDFS 日志无异常,考虑到做了当前集群访问是通过 router 进行转发,进一步查看 router 日志跟 Spark 的 jstack 信息,发现所有访问均是通过 165 的 router 进行的转发:

发现有如上 output error 的 warn 信息。怀疑 router 处理性能有问题,进一步查看 router handler 配置:发现 router 线程处理数目仅有 64:grep -A 2 handler.count hdfs-site.xml。根据之前压测经验,namenode 线程数建议在 20*logN(N 为集群规模,当前集群 3500,大概 12 次方的样子),故建议同步配置改为 240。更新配置后,rpc 处理能力明显增强,日志无 warn 信息,用户侧程序恢复正常。

问题 2:网卡性能问题

网管侧反馈数据发送不出来,出现流出阻塞,同步查看 Flume 接入发现 ch2 再次出现积压,查看主机侧发现流出侧受阻,进一步查看 Kafka 网络主机网络情况,发现多个节点的网卡在相同时间点的流量均达到 10Gib,由此确认网卡可能达到了上限。



单查看后台 ethtool eth0 发现网卡为 20Gib,不应出现流量瓶颈;进一步打流确认确认 Kafka 集群网卡当前配置仅能达到 10Gib,重新调整网卡策略后流量正常,积压缓解。

问题 3:接入数据异常

随着接入数据量的增加以及接入规范的变更,集群再次出现积压,表现为日志中数组越界、抓包数据符合规范并无异常;对于生产环境出现的这种数据处理问题,但线上无法 debug,同时线下无法重现的问题,一款有效的 Java 诊断工具就显得尤为重要。如何在不修改代码的情况下,对线上问题进行确认并定位出问题呢?

我们使用 Arthas 进行问题定位。Arthas 是一款线上监控诊断产品,通过全局视角实时查看应用 load、内存、gc、线程的状态信息,并能在不修改应用代码的情况下,对业务问题进行诊断,包括查看方法调用的出入参、异常,监测方法执行耗时,类加载信息等。它可以用于检查类是否被加载【对于解决多个 jar 包同一个类冲突尤为有用】、查看方法调用详情【动态更改方法参数、返回值、异常信息】、查看系统运行状况、在线诊断、在线热更新、性能热点等方面表现突出,可直接通过 tar 包部署使用,有效帮助运维或开发人员定位问题,提升应用性能。

这里仅介绍卡顿 debug 用的命令 thread,该命令用户查看当前线程信息,以及堆栈。为更好找到越界内容,采用 thread -b 找到当前阻塞其他线程的线程,如下:

多次执行:



从这里已经容易发现问题在 OutPut ,其 bufferSize 大小被截取,进一步查看代码发现 OutPut 默认 1m,处理输出时对于字段长度做了限制。故修改配置 bufferSize 为 10485760 后,问题得到解决。

总结

Flume 数据接入的影响因素主要是 sink 侧,当出现积压时可首先排查 sink 侧问题,比如到下游的网络是否打满,部分涉及防火墙流量上限,比如 HDFS 侧可以查看 rpc、网络跟写磁盘速率,比如写 Kafka 侧可以排查 Kafka 的 controller 是否切换、requests 请求是否过多、Ranger 是否达到性能瓶颈等。总而言之,一个稳定的系统离不开一个有效的监控系统,遇到问题不可怕,站在前人的肩上能有效提升问题定位的响应速度。

用户头像

移动云,5G时代你身边的智慧云 2019-02-13 加入

移动云大数据产品团队,在移动云上提供云原生大数据分析LakeHouse,消息队列Kafka/Pulsar,云数据库HBase,弹性MapReduce,数据集成与治理等PaaS服务。 微信公众号:人人都学大数据

评论

发布
暂无评论
浅谈 Apache Flume 数据接入的实现原理以及问题分析处理方式_flume_移动云大数据_InfoQ写作社区