大家好,我是怀瑾握瑜,一只大数据萌新,家有两只吞金兽,嘉与嘉,上能 code 下能 teach 的全能奶爸
如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~
前言
Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase 等)的能力 。
Flume 的机制决定 Flume 更多是用离线数据落盘的场景中,作为数据落地的第一步,说是咽喉毫不为过。而 Flume 的配置,也决定你收集到的数据的格式、类型、大小等等。你要根据你的业务情况或者技术选型,去决定这些。
详解
1. 核心概念
Flow: Event 从源点到达目的点的迁移的抽象。
Agent: 一个独立的 Flume 进程,包含组件 Source、 Channel、 Sink。(Agent 使用 JVM 运行 Flume。每台机器运行一个 agent,但是可以在一个 agent 中包含 多个 sources 和 sinks。)
Source: 数据收集组件。(source 从 Client 收集数据,传递给 Channel)
Channel: 中转 Event 的一个临时存储,保存由 Source 组件传递过来的 Event。(Channel 连接 sources 和 sinks ,这个有点像一个队列。)
Sink: 从 Channel 中读取并移除 Event, 将 Event 传递到 FlowPipeline 中的下一个 Agent(如果有的话)(Sink 从 Channel 收集数据,运行在一个独立线程。)
2. 配置快速使用讲解
我们这里以最常用的使用 Flume 接 Kafka 数据为例进行讲解
2.1 组件命名
f1.sources=s01
f1.channels=c01
f1.sinks=k01
复制代码
Agent 为 f1,它的 sources 是 s01,它的 channels 是 c01,它的 sinks 是 k01,如果配置为多个时用空格分隔,比如:f1.sinks=k01 k02 k03
2.2 Source 配置
注意:直接从 Kafka 接数据时 Source 可以不配置
2.2.1 netcat 的 source:bind 指定 IP,port 指定 port
f1.sources.s01.type = netcat
f1.sources.s01.bind = localhost
f1.sources.s01.port = 2222
复制代码
2.2.2 读文件 exec:commd 中写命令,如果用 tail 的话记得用大写的 F
f1.sources.s01.type = exec
f1.sources.s01.command = tail -F /var/log/hive.log
f1.sources.s01.shell = /bin/bash -c
复制代码
2.2.3 读取文件夹 source:spooldir source ,tmp 记得一定要忽略
f1.sources.s01.type = spooldir
# 指定文件夹
f1.sources.s01.spoolDir = /var/flume/upload
#指定文件上传后的后缀
f1.sources.s01.fileSuffix = .COMPLETED
f1.sources.s01.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
f1.sources.s01.ignorePattern = ([^ ]*.tmp)
复制代码
2.2.4 arvo 模式,bind 指的是接收的主机,port 不是随意的,是看 sink 给的端口
f1.sources.s01.type = avro
f1.sources.s01.bind = hadoop102
f1.sources.s01.port = 1234
复制代码
2.3 Channel 配置
f1.channels.c01.type=org.apache.flume.channel.kafka.KafkaChannel
f1.channels.c01.kafka.bootstrap.servers=kafka.com:9092
f1.channels.c01.kafka.topic=topic
f1.channels.c01.kafka.consumer.group.id=group_id
f1.channels.c01.kafka.consumer.max.poll.interval.ms=500000
f1.channels.c01.kafka.consumer.request.timeout.ms=505000
f1.channels.c01.kafka.consumer.session.timeout.ms=100000
f1.channels.c01.parseAsFlumeEvent=false
f1.channels.c01.pollTimeout=30000
f1.channels.c01.transactionCapacity=10000
f1.channels.c01.capacity=200000000
复制代码
该配置指定 channel 接受 kafka 的数据,配置上 servers、topic、group_id
2.4 Sink 配置
#指定sink类型为hdfs,配置hdfs路径
f1.sinks.k01.type=hdfs
f1.sinks.k01.hdfs.path=hdfs://cloud/flume/%y-%m-%d/
#文件指定前缀
f1.sinks.k01.hdfs.filePrefix=f1-k01
#压缩编码
f1.sinks.k01.hdfs.codeC=snappy
f1.sinks.k01.hdfs.fileType=CompressedStream
#序列化,需要手动开发,可以设置参数param
f1.sinks.k01.serializer=com.serializer.HDFSSerializer$Builder
f1.sinks.k01.serializer.param=XXX
#积攒多少个Event才flush到HDFS一次
f1.sinks.k01.hdfs.batchSize=1000
f1.sinks.k01.hdfs.idleTimeout=60
#最小冗余数
f1.sinks.k01.hdfs.minBlockReplicas=1
#文件的滚动与Event数量无关
f1.sinks.k01.hdfs.rollCount=0
#多久生成一个新的文件,单位是秒
f1.sinks.k01.hdfs.rollInterval=300
#设置每个文件的滚动大小,单位是Bit
f1.sinks.k01.hdfs.rollSize=256000000
f1.sinks.k01.hdfs.rollTimerPoolSize=5
f1.sinks.k01.hdfs.threadsPoolSize=30
f1.sinks.k01.hdfs.useLocalTimeStamp=true
复制代码
该配置为接收到的数据落 Hdfs,并且配置相关参数,比较特殊的是可以指定序列化器,对于落盘的数据可以进行进一步处理,比如协议转换等。
f1.sinks.k01.type=hbase2
f1.sinks.k01.zookeeperQuorum=XXXX:2181
f1.sinks.k01.table=table
f1.sinks.k01.znodeParent=/hbase-unsecure
f1.sinks.k01.serializer=com.serializer.HBASE2Serializer
f1.sinks.k01.batchSize=5000
f1.sinks.k01.columnFamily=d
复制代码
该配置为接受数据落到 hbase 中,也可以指定序列化器进行转换。
2.4 channel 和 source,sink 的绑定
f1.sources.s01.channels = c01
f1.sinks.k01.channel=c01
复制代码
结束语
如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~
评论