浅谈 Apache Flume 数据接入的实现原理以及问题分析处理方式
导读:本文介绍了 Apache Flume 的基本概念、应用在日志实时采集场景中的业务流程,以及几个实际业务问题的分析和处理过程,供大家参考。
背景
某业务集群负责 DPI 日志实时采集,采用 Flume 组件同日志上报网关进行对接。该业务采集流程如下:上报网关将文件进行 GZip 压缩,然后通过 Avro Rpc 等方式传输到接口机集群节点,接口机集群收到数据后对数据做响应处理并将处理结果写入下游,进而完成数据的采集行程卡流程。
整个数据采集过程,为了保证数据安全,遵循敏感数据不落地原则,采集程序对于数据进行了一系列的业务逻辑处理,包括数据解压缩、数据标准化、数据加密等。Flume 在其中产生的作用是对日志进行采集聚合。
整个业务流程主要涉及 A 侧、网关、数据采集、下游。其中 A 侧负责数据接入规范制定以及数据发送,网关侧负责数据转发,我们需要负责数据处理,下游通过 Spark Streaming 进行下一步处理。
数据流向图
概念介绍
Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(比如 Kafka、HDFS、HBase、Pulsar 等)的能力。
Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。其中可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。
下面介绍下 Agent 中几个重要的概念:Source、Channel、Sink。
Source:数据的接入端,采集侧,主要用于数据的获取,常见的数据源有 http、Socket、文件、Kafka 等数据源数据。可以自己实现一个 Netty 用于采集客户要求的数据。
Channel:传输通道,用于缓存从 Source 到 Sink 的数据。常用的 Memory Channel、Kafka Channel 以及 File Channel。
Sink:数据输出端,用于向下一次存储系统传递数据也可以用于传递数据,常见的如 HDFS、Kafka、HBase、Pulsar 等,可以自定义实现。
如下是当前数据处理的一个流程图:
Flume 构成图
从图中不难看出,一旦下游 sink 阻塞,整个流程便会出现问题。下面一起来看下 Agent 的内部原理:
1.Source 接收出入的数据;
2.Channel 处理传入的事件,由于要考虑后续后续数据流出以及做数据过滤,故增加一个拦截器出初步过滤
3.拦截器根据分隔符、列数、用户自定义函数等信息将对应数据进行过了后发送给 Selector
4.Selector 根据后续流程将数据进行复制或者复用
5.Selector 进一步返回数据给 Channel
6.Channel 根据上一步处理结果将 Event 写入不同的 Channel
7.Sink 处理器根据 sink 指定的 src 获取数据并做进一步处理。
主要处理逻辑可总结为如下流程:
为更好展示上述流程,来看一个简单的 agent 配置信息:
1.定义一个 agent,指定其涉及到的 source/channel/sink。示例中 agent 的 source 仅包含一个,可指定多个,通过空格隔开;sink 包含 4 个,2 个用于写 hdfs,2 个用于写 Kafka
2.指定 source 写入的 channel 以及 source 类型
3.指定 source 写数据的类型为复制以及复制方式是通过 header
4.指定 channel 的 ch1 的类型为内存类型以及事务大小
5.指定 sink 对应的不同 channel 以及其写入的集群,此处指定 ch1 写 HDFS 集群,ch2 写如 Kafka 集群。
问题定位
问题 1:HDFS 异常
接入侧积压,手机短信收到 ch2 满告警,同步登录监控页面,查看 channel 满:
业务出现卡顿,排查 Flume 的 jstack 信息发现有一个 create 的 rpc 请求(lock 对象 0x000000079ee2b620)时间 从 6-11_20:02:44 到 6-11_20:03:00 持续约 16 秒 怀疑 HDFS 的 rpc 请求慢:
此时同步查看 HDFS 对应时间点的 RpcQueueTime_avg_time 监控发现 ns1 的 namenode 在 20:01 和 02 分有 90 多毫秒的峰值,超过平均值:
查看 HDFS 日志无异常,考虑到做了当前集群访问是通过 router 进行转发,进一步查看 router 日志跟 Spark 的 jstack 信息,发现所有访问均是通过 165 的 router 进行的转发:
发现有如上 output error 的 warn 信息。怀疑 router 处理性能有问题,进一步查看 router handler 配置:发现 router 线程处理数目仅有 64:grep -A 2 handler.count hdfs-site.xml。根据之前压测经验,namenode 线程数建议在 20*logN(N 为集群规模,当前集群 3500,大概 12 次方的样子),故建议同步配置改为 240。更新配置后,rpc 处理能力明显增强,日志无 warn 信息,用户侧程序恢复正常。
问题 2:网卡性能问题
网管侧反馈数据发送不出来,出现流出阻塞,同步查看 Flume 接入发现 ch2 再次出现积压,查看主机侧发现流出侧受阻,进一步查看 Kafka 网络主机网络情况,发现多个节点的网卡在相同时间点的流量均达到 10Gib,由此确认网卡可能达到了上限。
单查看后台 ethtool eth0 发现网卡为 20Gib,不应出现流量瓶颈;进一步打流确认确认 Kafka 集群网卡当前配置仅能达到 10Gib,重新调整网卡策略后流量正常,积压缓解。
问题 3:接入数据异常
随着接入数据量的增加以及接入规范的变更,集群再次出现积压,表现为日志中数组越界、抓包数据符合规范并无异常;对于生产环境出现的这种数据处理问题,但线上无法 debug,同时线下无法重现的问题,一款有效的 Java 诊断工具就显得尤为重要。如何在不修改代码的情况下,对线上问题进行确认并定位出问题呢?
我们使用 Arthas 进行问题定位。Arthas 是一款线上监控诊断产品,通过全局视角实时查看应用 load、内存、gc、线程的状态信息,并能在不修改应用代码的情况下,对业务问题进行诊断,包括查看方法调用的出入参、异常,监测方法执行耗时,类加载信息等。它可以用于检查类是否被加载【对于解决多个 jar 包同一个类冲突尤为有用】、查看方法调用详情【动态更改方法参数、返回值、异常信息】、查看系统运行状况、在线诊断、在线热更新、性能热点等方面表现突出,可直接通过 tar 包部署使用,有效帮助运维或开发人员定位问题,提升应用性能。
这里仅介绍卡顿 debug 用的命令 thread,该命令用户查看当前线程信息,以及堆栈。为更好找到越界内容,采用 thread -b 找到当前阻塞其他线程的线程,如下:
多次执行:
从这里已经容易发现问题在 OutPut ,其 bufferSize 大小被截取,进一步查看代码发现 OutPut 默认 1m,处理输出时对于字段长度做了限制。故修改配置 bufferSize 为 10485760 后,问题得到解决。
总结
Flume 数据接入的影响因素主要是 sink 侧,当出现积压时可首先排查 sink 侧问题,比如到下游的网络是否打满,部分涉及防火墙流量上限,比如 HDFS 侧可以查看 rpc、网络跟写磁盘速率,比如写 Kafka 侧可以排查 Kafka 的 controller 是否切换、requests 请求是否过多、Ranger 是否达到性能瓶颈等。总而言之,一个稳定的系统离不开一个有效的监控系统,遇到问题不可怕,站在前人的肩上能有效提升问题定位的响应速度。
评论