写点什么

Flume 自定义拦截器

发布于: 2021 年 05 月 27 日
Flume自定义拦截器

1. 案例背景介绍

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume 有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor 等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的 Flume 处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。

2. 自定义拦截器

根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义 Flume 拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。

3. 功能实现


Ø  编写 java 代码,自定义拦截器

内容包括:

1.  定义一个类 CustomParameterInterceptor 实现 Interceptor 接口。

2.   在 CustomParameterInterceptor 类中定义变量,这些变量是需要到 Flume 的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)

3.  添加 CustomParameterInterceptor 的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的 unicode 编码进行转换为字符串。

4.  写具体的要处理的逻辑 intercept()方法,一个是单个处理的,一个是批量处理。

5.  接口中定义了一个内部接口 Builder,在 configure 方法中,进行一些参数配置。并给出,在 flume 的 conf 中没配置一些参数时,给出其默认值。通过其 builder 方法,返回一个 CustomParameterInterceptor 对象。

6.  定义一个静态类,类中封装 MD5 加密方法

7.    通过以上步骤,自定义拦截器的代码开发已完成,然后打包成 jar, 放到 Flume 的根目录下的 lib 中


代码:

import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;


public class MyInterceptor implements Interceptor { /** * The field_separator.指明每一行字段的分隔符 */ private final String fields_separator;
/** * The indexs.通过分隔符分割后,指明需要那列的字段 下标 */ private final String indexs;
/** * The indexs_separator. 多个下标的分隔符 */ private final String indexs_separator;
/** * @param indexs * @param indexs_separator */ public MyInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) { String f = fields_separator.trim(); String i = indexs_separator.trim(); this.indexs = indexs; this.encrypted_field_index = encrypted_field_index.trim(); if (!f.equals("")) { f = UnicodeUtils.unicodeToString(f); } this.fields_separator = f; if (!i.equals("")) { i = UnicodeUtils.unicodeToString(i); } this.indexs_separator = i; }


/** * The encrypted_field_index. 需要加密的字段下标 */ private final String encrypted_field_index;

/* * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event) * 单个event拦截逻辑 */ public Event intercept(Event event) { if (event == null) { return null; } try { String line = new String(event.getBody(), Charset.defaultCharset()); //flume中消息的分隔符,制表符 String[] fields_spilts = line.split(fields_separator); //索引信息的分隔符,逗号 String[] indexs_split = indexs.split(indexs_separator); String newLine = ""; for (int i = 0; i < indexs_split.length; i++) { int parseInt = Integer.parseInt(indexs_split[i]); //对加密字段进行加密 if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) { newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]); } else { newLine += fields_spilts[parseInt]; }
if (i != indexs_split.length - 1) { newLine += fields_separator; } } event.setBody(newLine.getBytes(Charset.defaultCharset())); return event; } catch (Exception e) { return event; } }
/* * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List) * 批量event拦截逻辑 */ public List<Event> intercept(List<Event> events) { List<Event> out = new ArrayList<Event>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; }
/* * @see org.apache.flume.interceptor.Interceptor#initialize() */ public void initialize() { // TODO Auto-generated method stub
}
/* * @see org.apache.flume.interceptor.Interceptor#close() */ public void close() { // TODO Auto-generated method stub
}

/** * 相当于自定义Interceptor的工厂类 * 在flume采集配置文件中通过制定该Builder来创建Interceptor对象 * 可以在Builder中获取、解析flume采集配置文件中的拦截器Interceptor的自定义参数: * 字段分隔符,字段下标,下标分隔符、加密字段下标 ...等 * * @author */ public static class Builder implements Interceptor.Builder {
/** * The fields_separator.指明每一行字段的分隔符 */ private String fields_separator;
/** * The indexs.通过分隔符分割后,指明需要那列的字段 下标 */ private String indexs;
/** * The indexs_separator. 多个下标下标的分隔符 */ private String indexs_separator;
/** * The encrypted_field. 需要加密的字段下标 */ private String encrypted_field_index;
/* * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context) */ public void configure(Context context) { fields_separator = context.getString(Constants.FIELD_SEPARATOR, Constants.DEFAULT_FIELD_SEPARATOR); indexs = context.getString(Constants.INDEXS, Constants.DEFAULT_INDEXS); indexs_separator = context.getString(Constants.INDEXS_SEPARATOR, Constants.DEFAULT_INDEXS_SEPARATOR); encrypted_field_index = context.getString(Constants.ENCRYPTED_FIELD_INDEX, Constants.DEFAULT_ENCRYPTED_FIELD_INDEX); }
/* * @see org.apache.flume.interceptor.Interceptor.Builder#build() */ public Interceptor build() { return new MyInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index); } }

}

复制代码


发布于: 2021 年 05 月 27 日阅读数: 195
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flume自定义拦截器