1. 案例背景介绍
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume 有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor 等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的 Flume 处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。
2. 自定义拦截器
根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义 Flume 拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。
3. 功能实现
Ø 编写 java 代码,自定义拦截器
内容包括:
1. 定义一个类 CustomParameterInterceptor 实现 Interceptor 接口。
2. 在 CustomParameterInterceptor 类中定义变量,这些变量是需要到 Flume 的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)
3. 添加 CustomParameterInterceptor 的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的 unicode 编码进行转换为字符串。
4. 写具体的要处理的逻辑 intercept()方法,一个是单个处理的,一个是批量处理。
5. 接口中定义了一个内部接口 Builder,在 configure 方法中,进行一些参数配置。并给出,在 flume 的 conf 中没配置一些参数时,给出其默认值。通过其 builder 方法,返回一个 CustomParameterInterceptor 对象。
6. 定义一个静态类,类中封装 MD5 加密方法
7. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成 jar, 放到 Flume 的根目录下的 lib 中
代码:
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
/**
* The field_separator.指明每一行字段的分隔符
*/
private final String fields_separator;
/**
* The indexs.通过分隔符分割后,指明需要那列的字段 下标
*/
private final String indexs;
/**
* The indexs_separator. 多个下标的分隔符
*/
private final String indexs_separator;
/**
* @param indexs
* @param indexs_separator
*/
public MyInterceptor(String fields_separator,
String indexs, String indexs_separator, String encrypted_field_index) {
String f = fields_separator.trim();
String i = indexs_separator.trim();
this.indexs = indexs;
this.encrypted_field_index = encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeUtils.unicodeToString(f);
}
this.fields_separator = f;
if (!i.equals("")) {
i = UnicodeUtils.unicodeToString(i);
}
this.indexs_separator = i;
}
/**
* The encrypted_field_index. 需要加密的字段下标
*/
private final String encrypted_field_index;
/*
* @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
* 单个event拦截逻辑
*/
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
String line = new String(event.getBody(), Charset.defaultCharset());
//flume中消息的分隔符,制表符
String[] fields_spilts = line.split(fields_separator);
//索引信息的分隔符,逗号
String[] indexs_split = indexs.split(indexs_separator);
String newLine = "";
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//对加密字段进行加密
if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) {
newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]);
} else {
newLine += fields_spilts[parseInt];
}
if (i != indexs_split.length - 1) {
newLine += fields_separator;
}
}
event.setBody(newLine.getBytes(Charset.defaultCharset()));
return event;
} catch (Exception e) {
return event;
}
}
/*
* @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
* 批量event拦截逻辑
*/
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
/*
* @see org.apache.flume.interceptor.Interceptor#initialize()
*/
public void initialize() {
// TODO Auto-generated method stub
}
/*
* @see org.apache.flume.interceptor.Interceptor#close()
*/
public void close() {
// TODO Auto-generated method stub
}
/**
* 相当于自定义Interceptor的工厂类
* 在flume采集配置文件中通过制定该Builder来创建Interceptor对象
* 可以在Builder中获取、解析flume采集配置文件中的拦截器Interceptor的自定义参数:
* 字段分隔符,字段下标,下标分隔符、加密字段下标 ...等
*
* @author
*/
public static class Builder implements Interceptor.Builder {
/**
* The fields_separator.指明每一行字段的分隔符
*/
private String fields_separator;
/**
* The indexs.通过分隔符分割后,指明需要那列的字段 下标
*/
private String indexs;
/**
* The indexs_separator. 多个下标下标的分隔符
*/
private String indexs_separator;
/**
* The encrypted_field. 需要加密的字段下标
*/
private String encrypted_field_index;
/*
* @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
*/
public void configure(Context context) {
fields_separator = context.getString(Constants.FIELD_SEPARATOR, Constants.DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(Constants.INDEXS, Constants.DEFAULT_INDEXS);
indexs_separator = context.getString(Constants.INDEXS_SEPARATOR, Constants.DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index = context.getString(Constants.ENCRYPTED_FIELD_INDEX, Constants.DEFAULT_ENCRYPTED_FIELD_INDEX);
}
/*
* @see org.apache.flume.interceptor.Interceptor.Builder#build()
*/
public Interceptor build() {
return new MyInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index);
}
}
}
复制代码
评论