写点什么

大数据 -19 Flume Agent 采集数据至 HDFS 集群 监听 Hive 日志 操作记录写入

作者:武子康
  • 2025-06-23
    美国
  • 本文字数:3016 字

    阅读完需:约 10 分钟

大数据-19 Flume Agent采集数据至HDFS集群 监听Hive日志 操作记录写入

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

目前 2025 年 06 月 16 日更新到:AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 06 月 23 日更新到:Java-53 深入浅出 Tomcat 性能优化 JVM 内存模型 垃圾回收 GC Tomcat 配置优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 正在更新,深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上一节我们完成了内容:


  • Flume 启动测试

  • Flume Conf 编写

  • Flume 测试发送和接收数据

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123


文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:https://flume.liyifeng.org/


Apache Flume 是一个分布式、可靠、高可用的系统,专为从不同数据源收集数据并聚合、传输到集中式数据存储(如 HDFS、HBase)而设计。Flume 特别适用于日志数据采集,常见于日志分析、实时监控、大数据平台等场景。

核心架构组件

  • Source:数据源,用于接收外部数据(如日志、HTTP 请求、Kafka 等)

  • Channel:缓存通道,临时存储数据(如内存、文件)以解耦 source 和 sink 的处理速度差异

  • Sink:数据目标,将 channel 中的数据写入目标系统(如 HDFS、HBase、Kafka、ElasticSearch)


这些组件共同构成一个 Agent(代理节点),一个 Flume 进程中可以包含多个这样的 Source-Channel-Sink 流程。

数据采集流程(以 HDFS 为目标)

Source 采集数据

常见的 Source 类型包括:


  • exec: 从一个 shell 命令中读取标准输出(如 tail -F log)

  • spooldir: 从一个目录读取文件(每个文件仅读取一次,适合离线数据)

  • netcat: 监听 TCP 端口,接收文本数据

  • http: 接收 HTTP 请求


示例:


agent.sources.source1.type = execagent.sources.source1.command = tail -F /var/log/nginx/access.log
复制代码

Channel 中转缓冲

常用类型:


  • memory: 存储在内存中,速度快但易丢数据

  • file: 存储在磁盘文件中,稳定但稍慢


agent.channels.channel1.type = memoryagent.channels.channel1.capacity = 10000agent.channels.channel1.transactionCapacity = 1000
复制代码

Sink 将数据写入 HDFS

常见的配置项目如下:


agent.sinks.sink1.type = hdfsagent.sinks.sink1.hdfs.path = hdfs://namenode:9000/flume/logs/%Y-%m-%d/agent.sinks.sink1.hdfs.filePrefix = eventsagent.sinks.sink1.hdfs.fileType = DataStreamagent.sinks.sink1.hdfs.writeFormat = Textagent.sinks.sink1.hdfs.batchSize = 1000agent.sinks.sink1.hdfs.rollInterval = 60      # 每60秒滚动一次新文件agent.sinks.sink1.hdfs.rollSize = 134217728   # 文件达到128MB时滚动agent.sinks.sink1.hdfs.rollCount = 0          # 记录条数触发滚动
复制代码

完整示例

agent.sources = source1agent.channels = channel1agent.sinks = sink1
# Sourceagent.sources.source1.type = execagent.sources.source1.command = tail -F /var/log/nginx/access.logagent.sources.source1.channels = channel1
# Channelagent.channels.channel1.type = memoryagent.channels.channel1.capacity = 10000agent.channels.channel1.transactionCapacity = 1000
# Sinkagent.sinks.sink1.type = hdfsagent.sinks.sink1.channel = channel1agent.sinks.sink1.hdfs.path = hdfs://namenode:9000/flume/logs/%Y-%m-%d/agent.sinks.sink1.hdfs.filePrefix = eventsagent.sinks.sink1.hdfs.fileType = DataStreamagent.sinks.sink1.hdfs.writeFormat = Textagent.sinks.sink1.hdfs.rollInterval = 60agent.sinks.sink1.hdfs.rollSize = 134217728agent.sinks.sink1.hdfs.rollCount = 0
复制代码


保存为 flume.conf,使用以下命令启动:


flume-ng agent -n agent -c conf -f flume.conf -Dflume.root.logger=INFO,console
复制代码

注意事项

数据丢失容错:


  • 推荐使用 file 类型的 channel 来保证在 agent 异常崩溃时不丢数据。

  • 配置 sink 的 hdfs.rollInterval 和 rollSize 以避免过大文件导致内存压力。


HDFS Sink 文件碎片问题:


  • 每次写入都会生成很多小文件(每个 roll 周期一个文件),可以定期使用 Hive/MapReduce 合并。


吞吐优化:


  • 增大 channel 的 capacity 和 transactionCapacity

  • 调整 sink 的 batchSize 参数


压缩写入 HDFS(推荐):


  • agent.sinks.sink1.hdfs.codeC = gzip

实践建议

  • 配合 Kafka 使用:Flume → Kafka → HDFS,更适用于大规模场景。

  • 支持多级代理(多级 Agent):可将多个 agent 收集的数据再聚合到中央 agent 并写入 HDFS。

  • 可嵌入 Spark 或 Hive:HDFS 中的数据可以直接被 Hive 查询或 Spark 消费。

环境准备

要将数据写入到 HDFS 中,我们需要一些支持库来完成。(这些支持库基本都在 Hadoop 的支持库中,没有的话,大家可以到 Maven 仓库搜索下载补充一下)


cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
复制代码


  • commons-configuration-1.6.jar

  • commons-io-2.4.jar

  • hadoop-auth-2.9.0.jar

  • hadoop-common-2.9.0.jar

  • hadoop-hdfs-2.9.0.jar

  • htrace-core4-4.1.0-incubating.jar


你需要把这些 Jar 包都拷贝到 $FLUME_HOME/lib 文件夹下:


cd $FLUME_HOME/libls
复制代码


配置文件

cd vim flume-exec-hdfs.conf
复制代码


编写如下的内容:


# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = c2
# Describe/configure the sourcea2.sources.r2.type = execa2.sources.r2.command = tail -F /tmp/root/hive.log# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 10000a2.channels.c2.transactionCapacity = 500
# Describe the sinka2.sinks.k2.type = hdfs# 这里注意修改为服务器的IP!!!# 注意是 HDFS 的,别写错了,具体看 Hadoop 的 core-site.xml fs.defaultFSa2.sinks.k2.hdfs.path = hdfs://h121.wzk.icu:9000/flume/%Y%m%d/%H%M# 上传文件的前缀a2.sinks.k2.hdfs.filePrefix = logs-# 是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true# 积攒500个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 500# 设置文件类型,支持压缩。DataStream没启用压缩a2.sinks.k2.hdfs.fileType = DataStream
# 1分钟滚动一次a2.sinks.k2.hdfs.rollInterval = 60# 128M滚动一次a2.sinks.k2.hdfs.rollSize = 134217700# 文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0
# 最小冗余数a2.sinks.k2.hdfs.minBlockReplicas = 1# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
复制代码


启动 Agent

$FLUME_HOME/bin/flume-ng agent --name a2 \--conf-file flume-exec-hdfs.conf \-Dflume.root.logger=INFO,console
复制代码



如果你启动一切顺利的话,你可以看到如下的内容:


测试效果

启动集群

start-dfs.shstart-yarn.sh
复制代码

启动 Hive

hive -e "show databases;"
复制代码


查看日志

可以看到 Flume 上有了输出


查看 HDFS

观察 HDFS,发现数据已经写入了:



发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-19 Flume Agent采集数据至HDFS集群 监听Hive日志 操作记录写入_大数据_武子康_InfoQ写作社区