写点什么

离线批处理的咽喉——Flume 基础配置简析

  • 2022 年 7 月 12 日
  • 本文字数:2202 字

    阅读完需:约 7 分钟

大家好,我是怀瑾握瑜,一只大数据萌新,家有两只吞金兽,嘉与嘉,上能 code 下能 teach 的全能奶爸

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~



前言

Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase 等)的能力 。



Flume 的机制决定 Flume 更多是用离线数据落盘的场景中,作为数据落地的第一步,说是咽喉毫不为过。而 Flume 的配置,也决定你收集到的数据的格式、类型、大小等等。你要根据你的业务情况或者技术选型,去决定这些。

详解

1. 核心概念

  • Client:Client 生产数据,运行在一个独立的线程

  • Event: 一个数据单元,消息头和消息体组成。(Events 可以是日志记录、 avro 对象等。)


  • Flow: Event 从源点到达目的点的迁移的抽象。

  • Agent: 一个独立的 Flume 进程,包含组件 Source、 Channel、 Sink。(Agent 使用 JVM 运行 Flume。每台机器运行一个 agent,但是可以在一个 agent 中包含 多个 sources 和 sinks。)

  • Source: 数据收集组件。(source 从 Client 收集数据,传递给 Channel)

  • Channel: 中转 Event 的一个临时存储,保存由 Source 组件传递过来的 Event。(Channel 连接 sources 和 sinks ,这个有点像一个队列。)

  • Sink: 从 Channel 中读取并移除 Event, 将 Event 传递到 FlowPipeline 中的下一个 Agent(如果有的话)(Sink 从 Channel 收集数据,运行在一个独立线程。)

2. 配置快速使用讲解

我们这里以最常用的使用 Flume 接 Kafka 数据为例进行讲解

2.1 组件命名

f1.sources=s01f1.channels=c01f1.sinks=k01
复制代码


Agent 为 f1,它的 sources 是 s01,它的 channels 是 c01,它的 sinks 是 k01,如果配置为多个时用空格分隔,比如:f1.sinks=k01 k02 k03

2.2 Source 配置

注意:直接从 Kafka 接数据时 Source 可以不配置

2.2.1 netcat 的 source:bind 指定 IP,port 指定 port

f1.sources.s01.type = netcatf1.sources.s01.bind = localhostf1.sources.s01.port = 2222
复制代码

2.2.2 读文件 exec:commd 中写命令,如果用 tail 的话记得用大写的 F

f1.sources.s01.type = execf1.sources.s01.command = tail -F /var/log/hive.logf1.sources.s01.shell = /bin/bash -c
复制代码

2.2.3 读取文件夹 source:spooldir source ,tmp 记得一定要忽略

f1.sources.s01.type = spooldir# 指定文件夹f1.sources.s01.spoolDir = /var/flume/upload#指定文件上传后的后缀f1.sources.s01.fileSuffix = .COMPLETEDf1.sources.s01.fileHeader = true#忽略所有以.tmp结尾的文件,不上传f1.sources.s01.ignorePattern = ([^ ]*.tmp)
复制代码

2.2.4 arvo 模式,bind 指的是接收的主机,port 不是随意的,是看 sink 给的端口

f1.sources.s01.type = avrof1.sources.s01.bind = hadoop102f1.sources.s01.port = 1234
复制代码

2.3 Channel 配置

f1.channels.c01.type=org.apache.flume.channel.kafka.KafkaChannelf1.channels.c01.kafka.bootstrap.servers=kafka.com:9092f1.channels.c01.kafka.topic=topicf1.channels.c01.kafka.consumer.group.id=group_idf1.channels.c01.kafka.consumer.max.poll.interval.ms=500000f1.channels.c01.kafka.consumer.request.timeout.ms=505000f1.channels.c01.kafka.consumer.session.timeout.ms=100000f1.channels.c01.parseAsFlumeEvent=falsef1.channels.c01.pollTimeout=30000f1.channels.c01.transactionCapacity=10000f1.channels.c01.capacity=200000000
复制代码


该配置指定 channel 接受 kafka 的数据,配置上 servers、topic、group_id

2.4 Sink 配置

#指定sink类型为hdfs,配置hdfs路径f1.sinks.k01.type=hdfsf1.sinks.k01.hdfs.path=hdfs://cloud/flume/%y-%m-%d/#文件指定前缀f1.sinks.k01.hdfs.filePrefix=f1-k01#压缩编码f1.sinks.k01.hdfs.codeC=snappyf1.sinks.k01.hdfs.fileType=CompressedStream#序列化,需要手动开发,可以设置参数paramf1.sinks.k01.serializer=com.serializer.HDFSSerializer$Builderf1.sinks.k01.serializer.param=XXX#积攒多少个Event才flush到HDFS一次f1.sinks.k01.hdfs.batchSize=1000f1.sinks.k01.hdfs.idleTimeout=60#最小冗余数f1.sinks.k01.hdfs.minBlockReplicas=1#文件的滚动与Event数量无关f1.sinks.k01.hdfs.rollCount=0#多久生成一个新的文件,单位是秒f1.sinks.k01.hdfs.rollInterval=300#设置每个文件的滚动大小,单位是Bitf1.sinks.k01.hdfs.rollSize=256000000f1.sinks.k01.hdfs.rollTimerPoolSize=5f1.sinks.k01.hdfs.threadsPoolSize=30f1.sinks.k01.hdfs.useLocalTimeStamp=true
复制代码


该配置为接收到的数据落 Hdfs,并且配置相关参数,比较特殊的是可以指定序列化器,对于落盘的数据可以进行进一步处理,比如协议转换等。


f1.sinks.k01.type=hbase2f1.sinks.k01.zookeeperQuorum=XXXX:2181f1.sinks.k01.table=tablef1.sinks.k01.znodeParent=/hbase-unsecuref1.sinks.k01.serializer=com.serializer.HBASE2Serializerf1.sinks.k01.batchSize=5000f1.sinks.k01.columnFamily=d
复制代码


该配置为接受数据落到 hbase 中,也可以指定序列化器进行转换。

2.4 channel 和 source,sink 的绑定

f1.sources.s01.channels = c01f1.sinks.k01.channel=c01
复制代码



结束语

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2022.07.01 加入

还未添加个人简介

评论

发布
暂无评论
离线批处理的咽喉——Flume基础配置简析_flume_怀瑾握瑜的嘉与嘉_InfoQ写作社区