写点什么

大数据 -20-Flume 采集数据双写 +HDFS 监控目录变化 Agent MemoryChannel Source

作者:武子康
  • 2025-06-24
    美国
  • 本文字数:4555 字

    阅读完需:约 15 分钟

大数据-20-Flume 采集数据双写+HDFS 监控目录变化 Agent MemoryChannel Source

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

目前 2025 年 06 月 16 日更新到:AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 06 月 23 日更新到:Java-53 深入浅出 Tomcat 性能优化 JVM 内存模型 垃圾回收 GC Tomcat 配置优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上一节完成了如下的内容:


  • 编写 Agent Conf 配置文件

  • 收集 Hive 数据

  • 汇聚到 HDFS 中

  • 测试效果

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123


文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:https://flume.liyifeng.org/

组件介绍

这里会对组件进行一个基本的介绍

Source(数据源)

常用如:


  • taildir-source:高性能监听文件(推荐)

  • exec-source:一次性执行命令(不推荐用于持续采集)

  • spooldir-source:监听指定目录新文件

Channel(缓存通道)

使用 Multiplexing Channel Selector 配合多 sink 实现“双写”:


  • memory channel 用于快速缓冲实时数据

  • file channel 用于 HDFS 分支,确保落盘可靠性

Sink(目标输出)

  • Sink1:KafkaSink(将数据推入 Kafka,供实时消费)

  • Sink2:HDFSEventSink(按时间路径写入 HDFS)

监控目录

业务需求

  • 想要监控指定目录 收集信息并上传到 HDFS 中

Source

选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。

Channel

选择 memory

Sink

选择 HDFS

需要注意

  • 拷贝到 spool 目录下的文件 不可以再打开编辑

  • 无法监控子目录的文件夹变动

  • 被监控文件夹每 500 毫秒 扫描一次文件变动

  • 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

生产环境

如果在生产环境中,我们的基本逻辑是:


  • 数据采集 一份实时入 Kafka 等消息队列或实时处理系统(如 Spark、Flink)

  • 同时 另一份可靠写入 HDFS 用于后续离线分析或长期存储


这种“双写”机制实现了实时 + 离线混合架构,满足以下需求:


  • 实时处理:秒级告警 / 监控系统

  • 离线分析:日报、月报、数据仓库


         [日志源,如Nginx、APP等]             Flume Source (taildir / exec / spooling)               Flume Channel (Memory + File)          ┌────────────┬────────────┐          ↓                         ↓  Flume Sink1 (Kafka)       Flume Sink2 (HDFS)
复制代码

配置样例

这里放一个 Demo


# Source 配置agent.sources.source1.type = TAILDIRagent.sources.source1.filegroups = f1agent.sources.source1.filegroups.f1 = /data/logs/nginx/access.log
# Channel 配置(内存和文件双通道)agent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000
agent.channels.fileChannel.type = fileagent.channels.fileChannel.checkpointDir = /var/log/flume/checkpointagent.channels.fileChannel.dataDirs = /var/log/flume/data
# Sink1: Kafkaagent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafkaSink.topic = nginx-logsagent.sinks.kafkaSink.brokerList = localhost:9092
# Sink2: HDFSagent.sinks.hdfsSink.type = hdfsagent.sinks.hdfsSink.hdfs.path = hdfs://namenode:8020/logs/nginx/%Y-%m-%d/agent.sinks.hdfsSink.hdfs.fileType = DataStreamagent.sinks.hdfsSink.hdfs.writeFormat = Text
# Channel 绑定agent.sources.source1.channels = memoryChannel fileChannelagent.sinks.kafkaSink.channel = memoryChannelagent.sinks.hdfsSink.channel = fileChannel
复制代码


这里我们需要注意


  • 多通道绑定:Source 支持绑定多个 Channel,Sink 各自绑定

  • 数据一致性:Flume 自身无强一致保证,但文件 channel 比 memory channel 更可靠

  • 容错机制:HDFS Sink 支持自动创建目录、滚动文件、断点续传

  • 性能调优:可设置 bufferSize、batchSize、rollInterval 提升性能

使用建议

  • 重要数据一定要落 HDFS 一份,避免 Kafka 异常导致数据丢失

  • Kafka 通道选 memory channel,速度更快

  • HDFS 落盘建议设置 rollInterval=60、batchSize=1000 适配大吞吐


落盘路径按天/小时分目录,例如:


hdfs.path = /logs/nginx/%Y/%m/%d/%H/
复制代码


此外我们常见的还会遇到如下的问题:


  • 数据延迟写入 HDFS:batch size 太大,rollInterval 太长,降低 batchSize 或缩短 rollInterval

  • Kafka 中数据丢失:memory channel 崩溃丢缓存,使用 file channel 或启用 Kafka ACK 机制

  • HDFS 写入失败:权限、网络、文件系统故障,开启 Flume 日志调试,检查路径和权限

配置文件

cd /opt/wzk/flume_testvim flume_spooldir-hdfs.conf
复制代码


我们需要写入如下内容


# Name the components on this agenta3.sources = r3a3.channels = c3a3.sinks = k3# Describe/configure the sourcea3.sources.r3.type = spooldir# 注意这里的文件夹 换成自己的!!!a3.sources.r3.spoolDir = /opt/wzk/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources.r3.fileHeader = true
# 忽略以.tmp结尾的文件,不上传a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 10000a3.channels.c3.transactionCapacity = 500# Describe the sinka3.sinks.k3.type = hdfs# 注意修改成你自己的IP!!!a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M
# 上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-# 是否使用本地时间戳a3.sinks.k3.hdfs.useLocalTimeStamp = true# 积攒500个Event,flush到HDFS一次a3.sinks.k3.hdfs.batchSize = 500# 设置文件类型a3.sinks.k3.hdfs.fileType = DataStream# 60秒滚动一次a3.sinks.k3.hdfs.rollInterval = 60# 128M滚动一次a3.sinks.k3.hdfs.rollSize = 134217700# 文件滚动与event数量无关a3.sinks.k3.hdfs.rollCount = 0# 最小冗余数a3.sinks.k3.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
复制代码

启动 Agent

$FLUME_HOME/bin/flume-ng agent --name a3 \--conf-file flume-spooldir-hdfs.conf \-Dflume.root.logger=INFO,console
复制代码


测试效果

Flume

cd /opt/wzk/uploadvim 1.txt
复制代码


随便向其中写入一些内容,并保存,可以看到 Flume 已经有反应了。


HDFS

查看 HDFS,也已经有内容了


采集双写

这里业务上需要:


  • Flume 将数据写入本地

  • Flume 将数据写入 HDFS

分析实现

  • 需要多个 Agent 级联实现

  • Source 选择 taildir

  • Channel 选择 memory

  • 最终的 Sink 分别选择 HDFS,file_roll


配置文件 1

配置文件包含如下内容:


  • 1 个 taildir source

  • 2 个 memory channel

  • 2 个 avro sink


新建文件


vim flume-taildir-avro.conf
复制代码


写入如下内容


# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# 将数据流复制给所有channela1.sources.r1.selector.type = replicating# sourcea1.sources.r1.type = taildir# 记录每个文件最新消费位置a1.sources.r1.positionFile = /root/flume/taildir_position.jsona1.sources.r1.filegroups = f1# 备注:.*log 是正则表达式;这里写成 *.log 是错误的a1.sources.r1.filegroups.f1 = /tmp/root/.*log# sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = linux123a1.sinks.k1.port = 9091a1.sinks.k2.type = avroa1.sinks.k2.hostname = linux123a1.sinks.k2.port = 9092# channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 500a1.channels.c2.type = memorya1.channels.c2.capacity = 10000a1.channels.c2.transactionCapacity = 500# Bind the source and sink to the channela1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2
复制代码

配置文件 2

配置文件包含如下内容:


  • 1 个 avro source

  • 1 个 memory channel

  • 1 个 hdfs sink


新建配置文件


vim flume-avro-hdfs.conf
复制代码


写入如下的内容:


# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = linux123a2.sources.r1.port = 9091# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 10000a2.channels.c1.transactionCapacity = 500# Describe the sinka2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H# 上传文件的前缀a2.sinks.k1.hdfs.filePrefix = flume2-# 是否使用本地时间戳a2.sinks.k1.hdfs.useLocalTimeStamp = true# 500个Event才flush到HDFS一次a2.sinks.k1.hdfs.batchSize = 500# 设置文件类型,可支持压缩a2.sinks.k1.hdfs.fileType = DataStream# 60秒生成一个新的文件a2.sinks.k1.hdfs.rollInterval = 60a2.sinks.k1.hdfs.rollSize = 0a2.sinks.k1.hdfs.rollCount = 0a2.sinks.k1.hdfs.minBlockReplicas = 1# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1
复制代码

配置文件 3

配置文件包含如下内容:


  • 1 个 avro source

  • 1 个 memory channel

  • 1 个 file_roll sink


新建配置文件


vim flume-avro-file.conf
复制代码


写入如下的内容


# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c2# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = linux123a3.sources.r1.port = 9092# Describe the sinka3.sinks.k1.type = file_roll# 目录需要提前创建好a3.sinks.k1.sink.directory = /root/flume/output# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 10000a3.channels.c2.transactionCapacity = 500# Bind the source and sink to the channela3.sources.r1.channels = c2a3.sinks.k1.channel = c2
复制代码

启动 Agent1

$FLUME_HOME/bin/flume-ng agent --name a3 \--conf-file ~/conf/flume-avro-file.conf \-Dflume.root.logger=INFO,console &
复制代码

启动 Agent2

$FLUME_HOME/bin/flume-ng agent --name a2 \--conf-file ~/conf/flume-avro-hdfs.conf \-Dflume.root.logger=INFO,console &
复制代码

启动 Agent3

$FLUME_HOME/bin/flume-ng agent --name a1 \--conf-file ~/conf/flume-taildir-avro.conf \-Dflume.root.logger=INFO,console &
复制代码

Hive 测试

hive -e "show databases;"
复制代码


发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-20-Flume 采集数据双写+HDFS 监控目录变化 Agent MemoryChannel Source_大数据_武子康_InfoQ写作社区