写点什么

Flume 高阶自定义组件

发布于: 2021 年 04 月 14 日
Flume高阶自定义组件

1.Flume 自定义 Source

1.1.自定义 Source 说明

Source 是负责接收数据到 FlumeAgent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

如:实时监控 MySQL,从 MySQL 中获取数据传输到 HDFS 或者其他存储框架,所以此时需要我们自己实现 MySQLSource。

官方也提供了自定义 source 的接口:

官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source

1.2.自定义 Source 原理

根据官方说明自定义 mysqlsource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

实现相应方法:

getBackOffSleepIncrement()    //暂不用

getMaxBackOffSleepInterval()  //暂不用

configure(Context context)    //初始化 context

process()  //获取数据(从 mysql 获取数据,业务处理比较复杂,所以我们定义一个专门的类——QueryMysql 来处理跟 mysql 的交互),封装成 event 并写入 channel,这个方法被循环调用

stop()  //关闭相关的资源

1.3.自定义 Source 具体实现

创建 mysql 数据库以及 mysql 数据库表
CREATE DATABASE `mysqlsource`;USE `mysqlsource`;/*Table structure for table `flume_meta` */DROP TABLEIF EXISTS `flume_meta`;
CREATE TABLE `flume_meta` ( `source_tab` VARCHAR (255) NOT NULL, `currentIndex` VARCHAR (255) NOT NULL, PRIMARY KEY (`source_tab`)) ENGINE = INNODB DEFAULT CHARSET = utf8;
/*Data for the table `flume_meta` */INSERT INTO `flume_meta` ( `source_tab`, `currentIndex`)VALUES ('student', '4');
/*Table structure for table `student` */DROP TABLEIF EXISTS `student`;
CREATE TABLE `student` ( `id` INT (11) NOT NULL AUTO_INCREMENT, `name` VARCHAR (255) NOT NULL, PRIMARY KEY (`id`)) ENGINE = INNODB AUTO_INCREMENT = 5 DEFAULT CHARSET = utf8;
/*Data for the table `student` */INSERT INTO `student` (`id`, `name`)VALUES (1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu'), (4, 'zhaoliu');
复制代码

2.Flume 自定义 Sink

2.1.自定义 Sink 说明

同自定义 source 类似,对于某些 sink 如果没有我们想要的,我们也可以自定义 sink 实现将数据保存到我们想要的地方去,例如 kafka,或者 mysql,或者文件等等都可以

需求:从网络端口当中发送数据,自定义 sink,使用 sink 从网络端口接收数据,然后将数据保存到本地文件当中去。

2.2.自定义 Sink 原理实现

自定义 MySink
public class MySink extends AbstractSink implements Configurable {    private Context context ;    private String filePath = "";    private String fileName = "";    private File fileDir;
//这个方法会在初始化调用,主要用于初始化我们的Context,获取我们的一些配置参数 @Override public void configure(Context context) { try { this.context = context; filePath = context.getString("filePath"); fileName = context.getString("fileName"); fileDir = new File(filePath); if(!fileDir.exists()){ fileDir.mkdirs(); } } catch (Exception e) { e.printStackTrace(); } } //这个方法会被反复调用 @Override public Status process() throws EventDeliveryException { Event event = null; Channel channel = this.getChannel(); Transaction transaction = channel.getTransaction(); transaction.begin(); while(true){ event = channel.take(); if(null != event){ break; } } byte[] body = event.getBody(); String line = new String(body); try { FileUtils.write(new File(filePath+File.separator+fileName),line,true); transaction.commit(); } catch (IOException e) { transaction.rollback(); e.printStackTrace(); return Status.BACKOFF; }finally { transaction.close(); } return Status.READY; }}
复制代码


发布于: 2021 年 04 月 14 日阅读数: 14
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flume高阶自定义组件