1.Flume 自定义 Source
1.1.自定义 Source 说明
Source 是负责接收数据到 FlumeAgent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
如:实时监控 MySQL,从 MySQL 中获取数据传输到 HDFS 或者其他存储框架,所以此时需要我们自己实现 MySQLSource。
官方也提供了自定义 source 的接口:
官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
1.2.自定义 Source 原理
根据官方说明自定义 mysqlsource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
getBackOffSleepIncrement() //暂不用
getMaxBackOffSleepInterval() //暂不用
configure(Context context) //初始化 context
process() //获取数据(从 mysql 获取数据,业务处理比较复杂,所以我们定义一个专门的类——QueryMysql 来处理跟 mysql 的交互),封装成 event 并写入 channel,这个方法被循环调用
stop() //关闭相关的资源
1.3.自定义 Source 具体实现
创建 mysql 数据库以及 mysql 数据库表
CREATE DATABASE `mysqlsource`;
USE `mysqlsource`;
/*Table structure for table `flume_meta` */
DROP TABLE
IF EXISTS `flume_meta`;
CREATE TABLE `flume_meta` (
`source_tab` VARCHAR (255) NOT NULL,
`currentIndex` VARCHAR (255) NOT NULL,
PRIMARY KEY (`source_tab`)
) ENGINE = INNODB DEFAULT CHARSET = utf8;
/*Data for the table `flume_meta` */
INSERT INTO `flume_meta` (
`source_tab`,
`currentIndex`
)
VALUES
('student', '4');
/*Table structure for table `student` */
DROP TABLE
IF EXISTS `student`;
CREATE TABLE `student` (
`id` INT (11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR (255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE = INNODB AUTO_INCREMENT = 5 DEFAULT CHARSET = utf8;
/*Data for the table `student` */
INSERT INTO `student` (`id`, `name`)
VALUES
(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu'), (4, 'zhaoliu');
复制代码
2.Flume 自定义 Sink
2.1.自定义 Sink 说明
同自定义 source 类似,对于某些 sink 如果没有我们想要的,我们也可以自定义 sink 实现将数据保存到我们想要的地方去,例如 kafka,或者 mysql,或者文件等等都可以
需求:从网络端口当中发送数据,自定义 sink,使用 sink 从网络端口接收数据,然后将数据保存到本地文件当中去。
2.2.自定义 Sink 原理实现
自定义 MySink
public class MySink extends AbstractSink implements Configurable {
private Context context ;
private String filePath = "";
private String fileName = "";
private File fileDir;
//这个方法会在初始化调用,主要用于初始化我们的Context,获取我们的一些配置参数
@Override
public void configure(Context context) {
try {
this.context = context;
filePath = context.getString("filePath");
fileName = context.getString("fileName");
fileDir = new File(filePath);
if(!fileDir.exists()){
fileDir.mkdirs();
}
} catch (Exception e) {
e.printStackTrace();
}
}
//这个方法会被反复调用
@Override
public Status process() throws EventDeliveryException {
Event event = null;
Channel channel = this.getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
while(true){
event = channel.take();
if(null != event){
break;
}
}
byte[] body = event.getBody();
String line = new String(body);
try {
FileUtils.write(new File(filePath+File.separator+fileName),line,true);
transaction.commit();
} catch (IOException e) {
transaction.rollback();
e.printStackTrace();
return Status.BACKOFF;
}finally {
transaction.close();
}
return Status.READY;
}
}
复制代码
评论