写点什么

大数据 -18 Flume HelloWorld 实现 Source Channel Sink 控制台流式收集

作者:武子康
  • 2025-06-23
    美国
  • 本文字数:3172 字

    阅读完需:约 10 分钟

大数据-18 Flume HelloWorld 实现Source Channel Sink 控制台流式收集

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

目前 2025 年 06 月 13 日更新到:AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 06 月 11 日更新到:Java-44 深入浅出 Nginx - 底层进程机制 Master Worker 机制原理 常用指令 MyBatis 已完结,Spring 已完结,深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上一节我们完成的内容:


  • Flume 简介

  • Flume 组件的介绍

  • Flume 架构、核心组件

  • Flume 下载、安装、配置

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123




基本介绍

Apache Flume 是一个分布式、可靠、可用性高的系统,主要用于高效采集、聚合和传输日志数据到集中式存储系统(如 HDFS、HBase 等)。它主要应用于 大数据日志采集场景,常与 Hadoop 生态系统一起使用。

核心特性

  • 可扩展性:支持多级代理(Agent),可横向扩展

  • 可靠性:支持事务机制,保证数据不丢失

  • 灵活性:Source/Channel/Sink 可组合

  • 容错机制:支持 failover 与 load balancing

  • 高吞吐量:支持异步批量传输,低延迟高吞吐

核心组件

Flume 的数据流模型中,Source、Channel、Sink 是三个最关键的组成部分,构成了从数据产生 → 缓存中转 → 最终写入 的整个流水线。

Source(数据源)

负责采集数据并将其包装成 Event(包含头部和正文),常见类型有:


  • exec(监听命令行输出)

  • netcat(基于 socket)

  • syslog、http、kafka、avro、spooldir(监控目录)


详细介绍的话有如下的:


  • exec:执行 shell 命令,如 tail -F,采集系统日志、文本输出

  • spooldir:监控目录中的新文件,一次读入且不修改原文件,日志文件转储收集

  • netcat:监听 TCP 端口,简单 socket 数据采集

  • avro:接收其他 Flume agent 发送的 Avro 数据,跨 Flume Agent 转发

  • kafka:从 Kafka 消费数据,实时流式采集

  • http:接收 HTTP 请求,IoT 数据、WebHook 接收

  • syslog:接收 Syslog 协议日志,系统安全与操作日志采集


配置示例如下:


agent.sources.r1.type = execagent.sources.r1.command = tail -F /var/log/syslog
复制代码

Channel(通道)

数据的临时存储缓冲区,起到削峰填谷、流控的作用。支持以下几种类型:


  • memory(高性能,适合低丢失容忍)

  • file(数据持久化,适合可靠性要求高)

  • jdbc channel(较少使用)


详细的内容有如下的:


  • memory:存储在 JVM 堆内,速度快,易丢数据,数据可容忍丢失(测试、非核心日志)

  • file:落盘保存,数据持久化,可靠性高,核心日志、不能丢失的数据

  • jdbc:基于数据库的 Channel,不常用,复杂度高

  • kafka(自定义):Kafka 作为中转通道,高并发、高解耦场景


常见的配置如下所示:


agent.channels.c1.type = memoryagent.channels.c1.capacity = 1000            # 最大缓存事件数agent.channels.c1.transactionCapacity = 100  # 每次事务提交最多事件数
复制代码

Sink(下沉)

负责将 Event 发送到目标系统,如:


  • HDFS、HBase、Kafka、ElasticSearch、Solr、Hive 等


详细的配置如下所示:


  • hdfs:写入 Hadoop HDFS 文件系统,日志归档、数据湖落地

  • kafka:推送到 Kafka topic,实时流处理、接入 Storm/Flink

  • logger:输出到控制台,调试使用

  • avro:向下游 Flume 发送数据,多级传输场景

  • elasticsearch:写入到 ES,日志检索与分析

  • hbase:写入 HBase 表,高并发 NoSQL 存储

  • file_roll:本地滚动文件写入,简单备份、本地存档


示例的配置如下:


agent.sinks.s1.type = hdfsagent.sinks.s1.hdfs.path = hdfs://namenode/flume/logs/%Y/%m/%d/agent.sinks.s1.hdfs.rollInterval = 60       # 每隔60秒滚动一个新文件agent.sinks.s1.hdfs.fileType = DataStream
复制代码

数据流向

数据源(WebServer、App日志)    Source(监听)   Channel(缓存)   Sink(写入 HDFS 等)
复制代码


  • Source:数据采集,支持 exec/http/kafka 等,tail、监听端口、消费 Kafka

  • Channel:中间缓冲,支持 memory/file 等,内存缓存、文件持久

  • Sink:数据下沉,写入外部系统,HDFS、Kafka、ES

使用场景

  • Web 日志采集:采集 Apache/Nginx 等日志到 HDFS

  • 应用系统日志:收集 Java 应用或服务日志到 Kafka

  • IoT 数据采集:与 Kafka 联动,实现高频数据注入

  • 数据湖构建:日志流式写入 HDFS 或 Parquet

类似对比

  • Flume:面向 Hadoop,稳定成熟,批量日志 → HDFS

  • Logstash:插件丰富,支持结构化处理,日志 → ElasticSearch

  • Filebeat:轻量,低资源占用。容器或边缘日志收集

  • Kafka Connect:与 Kafka 深度集成,数据同步


Flume 是面向日志传输领域的老牌工具,尤其适合 Hadoop 系统的数据注入,具备如下优势:


  • 架构清晰,可扩展强

  • 与 Hadoop 原生集成

  • 适合海量、批量日志传输


但在对实时性、结构化、复杂处理有需求的场景中,可以考虑 Logstash、Kafka Connect、Flink 等替代或补充。

HelloWorld

我们要实现一个简单的 conf,完成如下的目的:监听本机 8888 端口,Flume 将监听的数据实时显示在控制台


  • 使用工具可以向 8888 发送数据

  • 监听端口数据 选择 netcat source

  • Channel 选择 memory

  • 数据实时显示 选择 logger sink


lsof -i:8888
复制代码

基本示例

# agent名称agent.sources = r1agent.channels = c1agent.sinks = s1
# sourceagent.sources.r1.type = execagent.sources.r1.command = tail -F /var/log/nginx/access.logagent.sources.r1.channels = c1
# channelagent.channels.c1.type = memoryagent.channels.c1.capacity = 1000agent.channels.c1.transactionCapacity = 100
# sinkagent.sinks.s1.type = hdfsagent.sinks.s1.hdfs.path = hdfs://namenode/flume/logsagent.sinks.s1.channel = c1agent.sinks.s1.hdfs.fileType = DataStreamagent.sinks.s1.hdfs.writeFormat = Textagent.sinks.s1.hdfs.rollInterval = 60
复制代码

配置文件

我的目录都放倒了 flume_test 下,方便我后续的归档。


cd /opt/wzk/flume_testvim flume-netcat-logger.conf
复制代码


写入如下的内容:


# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1a1.sources = r1a1.channels = c1a1.sinks = k1
# sourcea1.sources.r1.type = netcat# 注意这里的IP的地址!!!!!!!!!!a1.sources.r1.bind = h122.wzk.icua1.sources.r1.port = 8888
# channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100
# sinka1.sinks.k1.type = logger# source、channel、sink之间的关系a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
复制代码


这里对上述的内容进行一点点介绍:Memory Channel 是使用内存缓冲 Event 的 Channel 实现。速度比较快速,容量会受到 JVM 内存大小的限制,可靠性不高。适用于允许丢失数据,但对性能要求较高的日志采集业务。

启动服务

配置完毕后,我们启动服务。


$FLUME_HOME/bin/flume-ng agent --name a1 \--conf-file flume-netcat-logger.conf \-Dflume.root.logger=INFO,console
复制代码


上述的参数解释:


  • name 定义 Agent 的名字,要与参数文件一致

  • conf-file 指定参数文件位置

  • -D 表示 Flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO。



发布于: 2025-06-23阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-18 Flume HelloWorld 实现Source Channel Sink 控制台流式收集_Apache_武子康_InfoQ写作社区