写点什么

一文读懂 Apache Flume 概念、使用和原理

发布于: 2021 年 03 月 10 日
一文读懂Apache Flume概念、使用和原理

Apache Flume


1. 概述


flume 是一个日志采集传输聚合的软件;分布式,高可靠高性能;


日志采集:


1:从无到有的类型;传感器等设备把客观事实记录为数据;


2:数据已经存在,只是进行搬移;flume(把数据从一个地方迁移到另一个地方(大数据分析平台))


flume 中的概念:


source:


channel:


sink:

使用的是 cdh5.14flume 是 1.6 版本


2. 运行机制


flume 中的概念:1:source:flume 中负责读取数据的组件,每种类型的数据源‘都有一个专门的 source;mysql->mysqlsource;文件--》专门的文件 source;channel:对数据进行一个缓冲;sink:针对写出数据也定义了一个组件,针对不同类型的有专门的 sink event:header,body;封装了我们读取的数据


Flume 安装部署


解压到/export/servers 目录下,配置 JAVA_HOME 即可。


简单案例监听本机端口 44444


source 详解:


flume 的开发步骤:


1:确定需求之后,明确 flume 各个组件使用哪个类型的?具体配置方式去官网摘抄,必填属性必须给;


2:使用 flume-ng 命令启动这个 agent 进程。


Flume 简单案例


1. 采集目录到 HDFS


source:spooldir;监听目录,如果目录下有新的文件生成则把文件内容读取然后上传;


a.txt-->a.txt.COMPLETED;


向 a.txt.COMPLETED 文件添加内容;spooldirsource 不会再读取其中新增内容;

2. 采集文件到 HDFS


source:exec source;不是监听文件 source;可以执行 unix 命令的一个 source;帮助我们执行一个 tail -F 的命令来监听文件内容的变化;


a.txt-->有新增内容就会被读取到;


channel:memory;


sink:hdfssink;


tail -f 与 tail -F 的区别:


有区别,tail -F 比 tail -f 更强大,tail -f 基于文件名监听,


这两种 source 类型要考虑业务需求,如果实时性有要求那么只能采用 exec 类型;实时动态监听文件内容变化;


如果有多个文件需要动态实时监听 exec 一个 source 无法满足;


1:多配置几个 source,每个 source 都是 exec 类型;


2:flume1.7 之后新增的一个 source 类型叫做 tailDirSource;可以实时动态监听整个目录甚至包括子目录中的文件变化;修改源码.


Flume 的 load-balance、failover


loadbanlance:负载均衡;就是提高 flume 吞吐量;就是把一件事情的工作量分给多个人;


failover:失败切换;高可用

1 高可用 Flum-NG 配置案例 failover


保证数据写出时有多个 sink 可以支持写出,但是正常工作时只有一个 sink 会接收数据;只有这个 sink 工作出现问题的时候才会尝试把数据发送给其它 sink;其它 sink 是作为 standby 角色使用。

a1.sinkgroups = g1 #sink组a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5  #优先级值, 绝对值越大表示优先级越高a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.priority.k3 = 6a1.sinkgroups.g1.processor.maxpenalty = 20000  #失败的Sink的最大回退期(millis)
复制代码


failover:搭建多层 flume 实现对第一层 flume 的 sink 进行 failover 设置,可以保证某个 sink 出现问题的时候有可用的 sink 顶替继续传输数据;

2、flume 的负载均衡 load balancer


loadbanlance 的配置重点:


1:搞清楚我们多层 flume 的对接的 sink,souce 的类型是 avro;


2 : sinkprocessor 的配置属性,会有 sink 组的概念;

a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = true  #如果开启,则将失败的sink放入黑名单a1.sinkgroups.g1.processor.selector = round_robin  # 另外还支持randoma1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
复制代码


两层 flume;第一层 sink 组的 processor 是 load_banlance;然后可以指定发送数据的方式:round_robin,random;

Flume 拦截器实战案例


1. 日志的采集和汇总


日志采集汇总的案例:


1:需求:是我们有多个日志数据,最终写入 hdfs 的时候要根据之前读取的文件名来确定写入的目录;


2:source 的读取时候把文件信息通过静态拦截器加入到 event 的 header 中;然后在写入 hdfs 的时候通过


%{type};type 要与之前存入的 key 保持一致;


1:静态拦截器;2:使用了多个 source;3:拦截器如何获取存取的数据;


拦截器帮助我们解决了什么问题?


1:可能需要对数据进行分类处理;要操作 event 数据;


2:静态拦截器只能对 event 的 header 进行操作,往里 追加写入一些信息;不能操作 body 里面的数据。这是一个局限性。


2. Flume 自定义拦截器


自定义拦截器的业务分析:

13601249301	100	200	300	400	500	600	70013601249302	100	200	300	400	500	600	70013601249303	100	200	300	400	500	600	70013601249304	100	200	300	400	500	600	70013601249305	100	200	300	400	500	600	70013601249306	100	200	300	400	500	600	70013601249307	100	200	300	400	500	600	70013601249308	100	200	300	400	500	600	70013601249309	100	200	300	400	500	600	70013601249310	100	200	300	400	500	600	70013601249311	100	200	300	400	500	600	70013601249312	100	200	300	400	500	600	700
复制代码


希望经过 flume 的自定义拦截器,把我们第一个字段进行加密,只保留 0,1,3,5,6,其余数据全部丢弃,最终再 hdfs 上显示的数据只有加密后的数据以及我们保留的字段。


业务分析:


拦截方法需要获取到每一条数据,event 对象,数据是在 event 中 body 中;获取到 body 的数据之后;其实是一行文本数据,所以需要先进行切分;按照制表符切分;切分之后怎么获取要保留的信息(这些信息放置在 conf 配置文件中,分隔符也放到配置文件中;加密的索引字段;索引的分隔符);


flume 自定义拦截器在静态拦截器不能满足需求的时候才想办法自定义的拦截器;


有两个方面:


1:通过这种案例锻炼自己的业务思维,抽象能力;


2:api 使用,模仿已经存在的模板代码定义自己逻辑;


工作中其实用的不是特别多。


发布于: 2021 年 03 月 10 日阅读数: 26
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
一文读懂Apache Flume概念、使用和原理