写点什么

数据采集 & 流批一体化处理使用指南

作者:大河
  • 2023-03-24
    湖南
  • 本文字数:64731 字

    阅读完需:约 212 分钟

[^]: bboss-datatran 是由 bboss 开源的轻量级数据采集ETL和流批处理工具,具备数据采集、数据清洗转换处理和数据入库以及数据指标统计计算流批一体化处理功能。

数据采集 &流批一体化处理使用指南

本文介绍如何通过 bboss-datatran 来实现数据采集 &流批一体化处理功能,实现数据采集、数据清洗转换处理和数据入库以及数据指标流批一体化计算处理功能。



通过 bboss-datatran,无需依赖第三方大数据中间件即可轻松完成数据采集、数据清洗转换处理和数据入库以及数据指标流批一体化计算处理功能,简单、便捷、高效。

在项目中集成 bboss-datatran

可以下载案例独立使用 bboss-datatran,亦可以将 bboss-datatran 集成到自己的项目中使用,只需在项目中导入 bboss-datatran 的 maven 坐标,具体参考下面的文档:


在工程中导入bboss-maven坐标


集成好 bboss-datatran,继续后续的章节和案例了解和使用 bboss-datatran。

1.流批一体化作业类型

通过 bboss 可以灵活定制具备各种功能的数据采集统计作业


  1. 只采集和处理数据作业

  2. 采集和处理数据、指标统计计算混合作业--通过 ImportBuilder 注册 ETLMetrics+其他数据源 Output 插件结合实现


importBuilder.setDataTimeField("logOpertime");importBuilder.addMetrics(keyMetrics);//通过importBuilder注册指标计算器,对采集数据进行指标计算并保存指标计算结果到各种数据库ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig();      elasticsearchOutputConfig                .setTargetElasticsearch("default")            .setIndex("dbdemo")            .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id            .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false            .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false    
importBuilder.setOutputConfig(elasticsearchOutputConfig);//设置Elasticsearch输出插件,保存加工后的原始数据
复制代码


  1. 采集数据只做指标统计计算作业--通过指标插件 MetricsOutputConfig 实现


 MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);//通过Metrics输出插件注册指标计算器,对采集数据进行指标计算并保存指标计算结果到各种数据库
importBuilder.setOutputConfig(metricsOutputConfig);//设置Metrics输出插件
复制代码


4)在应用中单独集成和使用指标统计功能

2.指标计算特点

  1. 支持时间维度和非时间维度指标计算

  2. 时间维度指标计算:支持指定统计滑动时间窗口,支持设定时间统计窗口类型,在流处理或者离线处理过程中,对于数据到来的先后顺序没有严格要求,乱序数据不影响最终指标计算结果

  3. 一个指标支持多个维度和多个度量字段计算,多个维度字段值构造成指标的唯一指标 key,支持有限基数 key 和无限基数 key 指标计算(维度字段组合形成的唯一指标 key 的个数是有限的就是有限基数,个数是无限的就是无限基数)

  4. 一个作业可以支持多种类型的指标,每种类型指标支持多个指标计算


5)支持准实时指标统计计算和离线指标统计计算


6)可以从不同的数据输入来源获取需要统计的指标数据,亦可以将指标计算结果保存到各种不同的目标数据源


目前支持的数据来源类型如下图所示,根据实际需要,可以按照 bboss 的插件规范非常方便地扩展更多的输入数据源:


3.关键对象和参数

下面说明数据采集 &流批一体化处理中涉及的常用对象,并介绍其中的属性和重要方法。


流批一体化处理涉及的两个关键对象和相应方法:


ETLMetrics 和 Metrics 对象的 persistent 方法:实现指标计算结果存储到关系数据库或者其他分布式数据库


TimeMetric 对象的 incr 方法:实现具体的指标统计计算逻辑,可以实现简单的计数、求总量、平均值,亦可以实现非常复杂的指标计算(譬如:用户访问深度、不同深度用户分布、用户留存、用户活跃、用户引流转化漏斗指标等复杂指标计算)


下面分别进行介绍说明。

3.1 ImportBuidler

采集作业配置,如果是数据采集和指标计算一体化的作业,则通过 ImportBuilder 配置 ETLMetrics 和数据采集输入/输出插件


3.2 MetricsOutputConfig

指标插件配置,提供指标计算规则配置:ETLMetrics、时间维度字段配置等,可以添加多个 ETLMetrics


3.3 Metrics

通用流批一体化数据处理指标计算器,


3.4 ETLMetrics

专用于 ETL 数据采集协同实现流批一体化数据处理指标计算器,Metrics 的子类,用于指标统计计算逻辑、指标维度字段、指标时间维度字段、指标 key 生成规则、指标对象构建器、指标数据校验、指标计算结果持久化、设置时间窗口大小、时间窗口类型、指标存储内存区 S0、S1 交换区大小



指标类型 metricsType 的使用方法,通过以下 Metrics 对象的 4 个常量指定:


public static int MetricsType_KeyTimeMetircs = 0;  //关键字+时间维度,适用于有限基数和无限基数指标key的场景,默认类型    public static int MetricsType_TimeKeyMetircs = 1;//时间维度+关键字,适用于有限基数和无限基数指标key的场景    public static int MetricsType_TimeMetircs = 2;//关键字+时间维度,适用于只有有限基数指标key的场景    public static int MetricsType_KeyMetircs = 3;//关键字维度,用于无时间维度的统计场景,适用于有限基数和无限基数指标key的场景
复制代码


构建不同类型指标构建器方法:


ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyMetircs)
复制代码


timeWindowType 几种类型:


public final static int TIME_WINDOW_TYPE_SECOND = 1;//秒时间窗口public final static int TIME_WINDOW_TYPE_MINUTE = 2;//分钟时间窗口,默认值public final static int TIME_WINDOW_TYPE_HOUR = 3;//小时时间窗口public final static int TIME_WINDOW_TYPE_DAY = 4;//天时间窗口public final static int TIME_WINDOW_TYPE_WEEK = 5;//周时间窗口public final static int TIME_WINDOW_TYPE_MONTH = 6;//月时间窗口
复制代码


提供了两种定义 ETLMetrics 的方法,一般来说方法 1 更加直观,推荐使用方法 1 创建 ETLMetrics

3.4.1 方法 1 实现 map 和 persistent

通过实现 map 和 persistent 方法,定义一个 ETLMetrics


 // 直接实现map和persistent方法,定义一个ETLMetrics        ETLMetrics keyMetrics1 = new ETLMetrics(Metrics.MetricsType_KeyTimeMetircs){            @Override            public void map(MapData mapData) {                CommonRecord data = (CommonRecord) mapData.getData();                //可以添加多个指标
//指标1 按操作模块统计模块操作次数 String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } String metricKey = operModule; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); }
});
//指标2 按照用户统计操作次数 String logUser = (String) data.getData("logOperuser"); metricKey = logUser; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); }
});

}
@Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulemetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginusermetrics", esData); }
});
} }; // key metrics中包含两个segment(S0,S1) keyMetrics1.setSegmentBoundSize(5000000); keyMetrics1.setTimeWindows(10); keyMetrics1.init(); metricsOutputConfig.addMetrics(keyMetrics);//指标计算作业 //importBuilder.addMetrics(keyMetrics1); 采集作业和指标计算混合作业模式
复制代码

自定义 MapData

如果要自定义创建 MapData,设置 BuildMapData 即可


//如果要自定义创建MapData,设置BuildMapData即可        keyMetrics1.setBuildMapData(new BuildMapData() {            @Override            public MapData buildMapData(MetricsData metricsData) {                BuildMapDataContext buildMapDataContext = metricsData.getBuildMapDataContext();                MapData mapData = new MapData(){                    /**                     * 根据指标标识,获取指标的时间统计维度字段,默认返回dataTime字段值,不同的指标需要指定不同的时间维度统计字段                     * 分析处理作业可以覆盖本方法,自定义获取时间维度字段值                     * @param metricsKey                     * @return                     */                    public Date metricsDataTime(String metricsKey) {//                if(metricsKey.equals("xxxx") ) {//                   Date time = (Date)data.get("collectime");//                   return time;//                }                        return getDataTime();                    }
}; Date dateTime = (Date) metricsData.getCommonRecord().getData("logOpertime"); mapData.setDataTime(dateTime);//默认按照操作时间作为指标时间维度字段,上面复写了metricsDataTime方法,可以根据指标key指定不同的时间维度值 mapData.setData(metricsData.getCommonRecord()); mapData.setDayFormat(buildMapDataContext.getDayFormat()); mapData.setHourFormat(buildMapDataContext.getHourFormat()); mapData.setMinuteFormat(buildMapDataContext.getMinuteFormat()); mapData.setYearFormat(buildMapDataContext.getYearFormat()); mapData.setMonthFormat(buildMapDataContext.getMonthFormat()); mapData.setWeekFormat(buildMapDataContext.getWeekFormat()); return mapData; } });
复制代码

3.4.2 方法 2 实现 builderMetrics 和 persistent 方法

通过实现 builderMetrics 和 persistent 方法,定义一个 ETLMetrics


ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyTimeMetircs){         @Override         public void builderMetrics(){                //自定义MapData,只能设置一个BuildMapData                setBuildMapData(new BuildMapData() {                    @Override                    public MapData buildMapData(MetricsData metricsData) {                        BuildMapDataContext buildMapDataContext = metricsData.getBuildMapDataContext();                        MapData mapData = new MapData(){                            /**                             * 根据指标标识,获取指标的时间统计维度字段,默认返回dataTime字段值,不同的指标需要指定不同的时间维度统计字段                             * 分析处理作业可以覆盖本方法,自定义获取时间维度字段值                             * @param metricsKey                             * @return                             */                            public Date metricsDataTime(String metricsKey) {//                if(metricsKey.equals("xxxx") ) {//                   Date time = (Date)data.get("collectime");//                   return time;//                }                                return getDataTime();                            }
}; Date dateTime = (Date) metricsData.getCommonRecord().getData("logOpertime"); mapData.setDataTime(dateTime);//默认按照操作时间作为指标时间维度字段,上面复写了metricsDataTime方法,可以根据指标key指定不同的时间维度值 mapData.setData(metricsData.getCommonRecord()); mapData.setDayFormat(buildMapDataContext.getDayFormat()); mapData.setHourFormat(buildMapDataContext.getHourFormat()); mapData.setMinuteFormat(buildMapDataContext.getMinuteFormat()); mapData.setYearFormat(buildMapDataContext.getYearFormat()); mapData.setMonthFormat(buildMapDataContext.getMonthFormat()); mapData.setWeekFormat(buildMapDataContext.getWeekFormat()); return mapData; } }); //可以添加多个指标构建器 //指标1 按操作模块统计模块操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ //生成指标key CommonRecord data = (CommonRecord) mapData.getData(); String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } return operModule; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() {//生成指标对象 return new LoginModuleMetric(); } }; } });
//指标2 按照用户统计操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String logUser = (String) data.getData("logOperuser");// if(logUser == null || logUser.equals("")){ logUser = "未知用户"; } return logUser; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); } }; } }); // key metrics中包含两个segment(S0,S1) setSegmentBoundSize(5000000); setTimeWindows(60 );//统计时间窗口 this.setTimeWindowType(MetricsConfig.TIME_WINDOW_TYPE_MINUTE);//统计时间窗口类型 }
/** * 存储指标计算结果 * @param metrics */ @Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulemetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginusermetrics", esData); }
});
} };
复制代码

3.5 TimeMetric

非常关键的一个对象,指标对象抽象类,所有的指标都需要被定义为 TimeMetric 的子类,封装指标维度字段信息、指标计算逻辑,TimeMetric 内置了时间相关的维度字段,用来标记指标对应的 year、month、week、day、hour、minitue 信息,来源于时间维度字段值,通过这些时间维度字段值可以非常方便地实现性能优异的年报、月报、周报、日报、小时、分钟级别的统计报表。dataTime 代表了指定对应的 Date 时间,可以用户时间范围统计。

3.5.1 属性和方法说明

具体属性和方法说明如下


3.5.2 简单示例

TimeMetric 的一个简单实现如下:按操作模块统计模块操作次数 LoginModuleMetric


public class LoginModuleMetric extends TimeMetric {    private String operModule ;//维度字段,操作模块    @Override    public void init(MapData firstData) {//接收MapData类型参数,初始化指标对象对应的指标维度字段以及其他指标计算依赖的信息或者第三方组件        CommonRecord data = (CommonRecord) firstData.getData();//CommonRecord对象,封装了作业采集处理后的数据记录,可以通过getData(fieldName)获取数据字段值,通过数据的加工处理,对数据进行转换、打标处理操作        operModule = (String) data.getData("operModule");//初始化操作模块维度字段值    }
@Override public void incr(MapData data) { CommonRecord data = (CommonRecord) data.getData();//CommonRecord对象,封装了作业采集处理后的数据记录,可以通过getData(fieldName)获取数据字段值,通过数据的加工处理,对数据进行转换、打标处理操作,根据数据的标签,进行相关指标的统计计算,例如代表用户行为的PV、UV指标等,这里可以实现非常复杂的指标统计计算 count ++;//一个简单的计数统计 }//返回维度字段,用于在persistent指标计算结果时调用获取指标的对应操作模块 public String getOperModule() { return operModule; }}
复制代码

3.5.3 自定义指标值示例

我们可以根据实际需要为指标对象添加新的指标度量值字段,根据实际的情况实现 incr 方法,通过下面的示例进行了解:


/** * 网页访问统计、重点页面统计指标 */public class WebpageMetric  extends BaseWebPageMetric  {

protected String postionId;
private String postionChanCode; //触点渠道
/** * 访问量独立访客次数uv */ private long uv;
/** * url对应的独立访客统计 */ private long urlUv;
/** * url对应的手机号码独立访客统计 */ private long urlPhoneUv;
/** * 访问量新独立访客数uv */ private long newUv;

/** * 访问量新独立访客数uv */ private long newPhoneUv;
/** * 访问量独立访客数uv */ private long phoneUv;



/** * 访客数浏览量 */// private long visitorViews; /** * 新访客数浏览量 */ private long newVisitorViews;
/** * 手机号码维度访客数浏览量 */// private long phoneVisitorViews; /** * 手机号码维度新访客数浏览量 */ private long phoneNewVisitorViews;

/** * 登录用户数 */ private long loginUser;
/** * 未登录用户数 */ private long unoginUser; /** * 回头客数 */ private int rv;
/** * session个数 */ private int sessions;
/** * 新访客浏览量 */ private long newPv;
/** * 老访客浏览量 */ private long oldPv;
/** * 跳出用户数 */ private long jumpOutUser;
private long normal;
private long abnormal;
private String urlConfigId;
public WebpageMetric() { }
public long getPhoneUv() { return phoneUv; } public String getRegion() { return region; }

@Override public void init(MapData firstData) { super.init(firstData); TerminalMessages data = (TerminalMessages) firstData.getData(); postionId = data.getPostionId(); postionChanCode = data.getPostionChanCode(); urlConfigId = data.getUrlConfigId(); }
@Override public void incr(MapData data) { //statics from Map data super.incr(data); this.ips++; this.pv++;
TerminalMessages message = (TerminalMessages) data.getData(); long reqt = message.getPerformance().getReqt(); this.totalElapsed += reqt; this.failed += message.getErrorFlag() == 0 ? 0 : 1; this.success += message.getErrorFlag() == 0 ? 1 : 0; this.abnormal += message.getFailFlag() == 0 ? 0 : 1; this.normal += message.getFailFlag() == 0 ? 1 : 0;

int newUvTag = message.getNewUvTag(); if(newUvTag == 1){ this.newUv++;
}
int newPhoneUvTag = message.getNewPhoneUvTag(); if(newPhoneUvTag == 1){ this.newPhoneUv++;
}
int phoneUvTag = message.getPhoneUvTag(); if(phoneUvTag == 1){ this.phoneUv++;
} int uvTag = message.getUvTag(); if(uvTag == 1){ this.uv++;
}
if(message.getUrlUvTag() == 1){ urlUv ++; }
if(message.getUrlPhoneUvTag() == 1){ urlPhoneUv ++; }
int loginTag = message.getLoginTag();
if(loginTag == 1){ this.loginUser++; }else { this.unoginUser++; } int rvTag = message.getRvTag(); if(rvTag == 1){ rv ++; } if(message.getNewSessionTag() == 1){ sessions ++; }


/** * 新访客数浏览量 */ int newUvSidTag = message.getNewUvSidTag(); if(newUvSidTag == 1){ newVisitorViews++;
}

/** * 手机号码维度新访客数浏览量 */ int newPhoneUvSidTag = message.getNewPhoneUvSidTag(); if(newPhoneUvSidTag == 1){ phoneNewVisitorViews++;
}

}
public int getSessions() { return sessions; }
public int getRv() { return rv; }
public long getUv() { return uv; }
public void setUv(long uv) { this.uv = uv; }
public long getNewUv() { return newUv; }
public void setNewUv(long newUv) { this.newUv = newUv; }
public long getNewPhoneUv() { return newPhoneUv; }
public void setNewPhoneUv(long newPhoneUv) { this.newPhoneUv = newPhoneUv; }
public long getLoginUser() { return loginUser; }
public void setLoginUser(long loginUser) { this.loginUser = loginUser; }
public long getUnoginUser() { return unoginUser; }
public void setUnoginUser(long unoginUser) { this.unoginUser = unoginUser; }



public long getNewPv() { return newPv; }
public void setNewPv(long newPv) { this.newPv = newPv; }
public long getOldPv() { return oldPv; }
public void setOldPv(long oldPv) { this.oldPv = oldPv; }


public long getNewVisitorViews() { return newVisitorViews; }
public void setNewVisitorViews(long newVisitorViews) { this.newVisitorViews = newVisitorViews; }


public long getPhoneNewVisitorViews() { return phoneNewVisitorViews; }
public void setPhoneNewVisitorViews(long phoneNewVisitorViews) { this.phoneNewVisitorViews = phoneNewVisitorViews; }
public long getUrlUv() { return urlUv; }
public long getNormal() { return normal; }
public void setNormal(long normal) { this.normal = normal; }
public long getAbnormal() { return abnormal; }
public void setAbnormal(long abnormal) { this.abnormal = abnormal; }
public long getUrlPhoneUv() { return urlPhoneUv; }
public void setUrlPhoneUv(long urlPhoneUv) { this.urlPhoneUv = urlPhoneUv; }
public String getPostionId() { return postionId; }
public void setPostionId(String postionId) { this.postionId = postionId; }
public String getPostionChanCode() { return postionChanCode; }
public void setPostionChanCode(String postionChanCode) { this.postionChanCode = postionChanCode; }
public String getUrlConfigId() { return urlConfigId; }
public void setUrlConfigId(String urlConfigId) { this.urlConfigId = urlConfigId; }}
复制代码

3.6 KeyMetricBuilder

指标对象构建器,用于构建指标对象,同时提供数据校验逻辑,校验符合条件的数据才会提交给指标对象进行计算


3.7 MetricBuilder

用户构建指标 key 和 KeyMetricBuilder ,ETLMetrics 中可以通过 addMetricBuilder 添加多个 MetricBuilder 用户统计多个指标


3.8 MapData

在数据采集作业中,MapData 默认封装了:


data 属性:原始统计数据 CommonRecord 记录,指标对象从 CommonRecord 中获取指标维度字段,并根据维度字段生成指标 key。可以通过 BuildMapData 接口自定义特定类型的 MapData,示例如下:


//自定义MapData,只能设置一个BuildMapData                setBuildMapData(new BuildMapData() {                    @Override                    public MapData buildMapData(MetricsData metricsData) {                        BuildMapDataContext buildMapDataContext = metricsData.getBuildMapDataContext();                        MapData mapData = new MapData(){                            /**                             * 根据指标标识,获取指标的时间统计维度字段,默认返回dataTime字段值,不同的指标需要指定不同的时间维度统计字段                             * 分析处理作业可以覆盖本方法,自定义获取时间维度字段值                             * @param metricsKey                             * @return                             */                            public Date metricsDataTime(String metricsKey) {//                if(metricsKey.equals("xxxx") ) {//                   Date time = (Date)data.get("collectime");//                   return time;//                }                                return getDataTime();                            }
}; Date dateTime = (Date) metricsData.getCommonRecord().getData("logOpertime"); mapData.setDataTime(dateTime);//默认按照操作时间作为指标时间维度字段,上面复写了metricsDataTime方法,可以根据指标key指定不同的时间维度值 mapData.setData(metricsData.getCommonRecord()); mapData.setDayFormat(buildMapDataContext.getDayFormat()); mapData.setHourFormat(buildMapDataContext.getHourFormat()); mapData.setMinuteFormat(buildMapDataContext.getMinuteFormat()); mapData.setYearFormat(buildMapDataContext.getYearFormat()); mapData.setMonthFormat(buildMapDataContext.getMonthFormat()); mapData.setWeekFormat(buildMapDataContext.getWeekFormat()); return mapData; } });
复制代码


从示例里面可以看到,通过自定义 mapdata,还可以为不同的指标使用不同的时间维度字段值,通过重载 MapData 的方法 metricsDataTime 实现:


 MapData mapData = new MapData(){                            /**                             * 根据指标标识,获取指标的时间统计维度字段,默认返回dataTime字段值,不同的指标需要指定不同的时间维度统计字段                             * 分析处理作业可以覆盖本方法,自定义获取时间维度字段值                             * @param metricsKey                             * @return                             */                            public Date metricsDataTime(String metricsKey) {//                if(metricsKey.equals("xxxx") ) {//                   Date time = (Date)data.get("collectime");//                   return time;//                }                                return getDataTime();                            }
};
复制代码


mapdata 对象中还封装了 dataTime 时间维度值、各种时间格式,用来处理格式化时间维度相关的字段。

4.案例介绍

案例工程下载地址:


https://gitee.com/bboss/bboss-datatran-demo

4.1 用户操作次数指标统计

基于数字类型 db-es 增量同步及指标统计计算案例,如需调试功能,直接运行 main 方法即可。本案例实时按照一定的时间间隔,从数据库用户操作日志表中增量采集用户操作日志数据,并对用户 IP 地址信息数据进行加工处理后,交给指标插件统计不同的用户每分钟操作系统次数,以及不同模块每分钟操作次数,将统计结果写入 Elasticsearch 数据库。数据采集频率每 10 秒采集一次,统计实际窗口为 1 分钟,时间维度字段为 logOpertime(操作时间)。


通过 MetricsOutputConfig 插件设置 ETLMetrics 和时间维度字段


 MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setOutputConfig(metricsOutputConfig);
复制代码


案例代码如下:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/metrics/Db2MetricsDemo.java


public void scheduleTimestampImportData(boolean dropIndice){
ImportBuilder importBuilder = new ImportBuilder() ; //在任务数据抽取之前做一些初始化处理,例如:通过删表来做初始化操作

importBuilder.setImportStartAction(new ImportStartAction() { /** * 初始化之前执行的处理操作,比如后续初始化操作、数据处理过程中依赖的资源初始化 * @param importContext */ @Override public void startAction(ImportContext importContext) { }
/** * 所有初始化操作完成后,导出数据之前执行的操作 * @param importContext */ @Override public void afterStartAction(ImportContext importContext) { try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginmodulemetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginmodulemetrics failed:",e); } try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginusermetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginusermetrics failed:",e); }
} });
DBInputConfig dbInputConfig = new DBInputConfig(); //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // 需要设置setLastValueColumn信息log_id, // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);

//bulkprocessor和Elasticsearch输出插件共用Elasticsearch数据源,因此额外进行数据源初始化定义 Map properties = new HashMap();
//default为默认的Elasitcsearch数据源名称 properties.put("elasticsearch.serverNames","default");
/** * 默认的default数据源配置,每个配置项可以加default.前缀,也可以不加 */

properties.put("default.elasticsearch.rest.hostNames","192.168.137.1:9200"); properties.put("default.elasticsearch.showTemplate","true"); properties.put("default.elasticUser","elastic"); properties.put("default.elasticPassword","changeme"); properties.put("default.elasticsearch.failAllContinue","true"); properties.put("default.http.timeoutSocket","60000"); properties.put("default.http.timeoutConnection","40000"); properties.put("default.http.connectionRequestTimeout","70000"); properties.put("default.http.maxTotal","200"); properties.put("default.http.defaultMaxPerRoute","100"); ElasticSearchBoot.boot(properties);
BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder(); bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(200)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(10)//bulk处理工作线程数 .setWorkThreadQueue(50)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("detail_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("detail bulkprocessor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .setElasticsearch("default")//指定明细Elasticsearch集群数据源名称,bboss可以支持多数据源 .setFilterPath(BulkConfig.ERROR_FILTER_PATH) .addBulkInterceptor(new BulkInterceptor() { public void beforeBulk(BulkCommand bulkCommand) {
}
public void afterBulk(BulkCommand bulkCommand, String result) { if(logger.isDebugEnabled()){ logger.debug(result); } }
public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } } public void errorBulk(BulkCommand bulkCommand, String result) { if(logger.isWarnEnabled()){ logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器 ; /** * 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */ BulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件 ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyTimeMetircs){ @Override public void builderMetrics(){ //指标1 按操作模块统计模块操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } return operModule; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); } }; } });
//指标2 按照用户统计登录次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String logUser = (String) data.getData("logOperuser");// if(logUser == null || logUser.equals("")){ logUser = "未知用户"; } return logUser; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); } }; } }); // key metrics中包含两个segment(S0,S1) setSegmentBoundSize(5000000); setTimeWindows(60 );//统计时间窗口 this.setTimeWindowType(MetricsConfig.TIME_WINDOW_TYPE_MINUTE); }
/** * 存储指标计算结果 * @param metrics */ @Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulemetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginusermetrics", esData); }
});
} }; //作业结束后销毁初始化阶段自定义的http数据源 importBuilder.setImportEndAction(new ImportEndAction() { @Override public void endAction(ImportContext importContext, Exception e) {
bulkProcessor.shutDown();
} }); MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setOutputConfig(metricsOutputConfig);

importBuilder// .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false .setBatchSize(10); //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
//定时任务配置, importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次 //定时任务配置结束

//增量配置开始// importBuilder.setStatusDbname("test");//设置增量状态数据源名称 importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(true);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setLastValueStorePath("db2metrics_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样// importBuilder.setLastValueStoreTableName("logstable");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
/** * 通过DataRefactor接口,实现数据加工、清洗、转换、记录丢弃、打标等操作,将处理后的数据交给输出插件进行输出或者指标计算器进行指标统计计算 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception {// Date date = context.getDateValue("LOG_OPERTIME"); context.addFieldValue("collecttime",new Date()); IpInfo ipInfo = context.getIpInfoByIp("219.133.80.136"); if(ipInfo != null) context.addFieldValue("ipInfo", SimpleStringUtil.object2json(ipInfo)); } }); //映射和转换配置结束
/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回

// //设置任务执行拦截器,可以添加多个,定时任务每次执行的拦截器 importBuilder.addCallInterceptor(new CallInterceptor() { @Override public void preCall(TaskContext taskContext) { System.out.println("preCall"); }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall"); }
@Override public void throwException(TaskContext taskContext, Throwable e) { System.out.println("throwException"); } }); // //设置任务执行拦截器结束,可以添加多个
//采集作业task执行结果回调处理接口,可以对任务执行成功、错误、异常回调处理,同时可以通过taskMetrics获取任务执行统计数据,知悉任务记录执行处理情况:成功数、失败数、忽略数量、总数量等 importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() { @Override public void success(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void exception(TaskCommand<String,String> taskCommand, Throwable exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.debug(taskMetrics.toString()); }

});

/** * 构建和执行数据库表数据导入es和指标统计作业 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();

}
复制代码

4.1.1 两个指标对象

指标 1 按操作模块统计模块操作次数


public class LoginModuleMetric extends TimeMetric {    private String operModule ;    @Override    public void init(MapData firstData) {        CommonRecord data = (CommonRecord) firstData.getData();        operModule = (String) data.getData("operModule");    }
@Override public void incr(MapData data) { count ++; }
public String getOperModule() { return operModule; }}
复制代码


指标 2 按照用户统计操作次数


public class LoginUserMetric extends TimeMetric {    private String logUser;    @Override    public void init(MapData firstData) {        CommonRecord data = (CommonRecord) firstData.getData();        logUser = (String) data.getData("logOperuser");    }
@Override public void incr(MapData data) { count ++; }
public String getLogUser() { return logUser; }}
复制代码

4.1.2 持久化指标结果到 Elasticsearch

通过 BulkProcessor 异步批处理组件持久化指标计算结果到 Elasticsearch:


public void persistent(Collection< KeyMetric> metrics) {   metrics.forEach(keyMetric->{      if(keyMetric instanceof LoginModuleMetric) {                     LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric;         Map esData = new HashMap();         esData.put("dataTime", testKeyMetric.getDataTime());           esData.put("year", testKeyMetric.getYear());//2018                        esData.put("month", testKeyMetric.getMonth());//201804                        esData.put("week", testKeyMetric.getWeek());//2018042           esData.put("timeMetricKey", testKeyMetric.getMetricTimeKey());//时间窗口维度key,例如,时间窗口类型为Day时,对应的值为:20230219         esData.put("hour", testKeyMetric.getDayHour());         esData.put("minute", testKeyMetric.getMinute());         esData.put("day", testKeyMetric.getDay());         esData.put("metric", testKeyMetric.getMetric());         esData.put("operModule", testKeyMetric.getOperModule());//登录用户操作模块         esData.put("count", testKeyMetric.getCount());//对应模块操作次数         bulkProcessor.insertData("vops-loginmodulemetrics", esData);      }      else if(keyMetric instanceof LoginUserMetric) {                     LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric;         Map esData = new HashMap();         esData.put("dataTime", testKeyMetric.getDataTime());//指标统计计算时间           esData.put("year", testKeyMetric.getYear());//2018                        esData.put("month", testKeyMetric.getMonth());//201804                        esData.put("week", testKeyMetric.getWeek());//2018042           esData.put("timeMetricKey", testKeyMetric.getMetricTimeKey());//时间窗口维度key,例如,时间窗口类型为Day时,对应的值为:20230219         esData.put("hour", testKeyMetric.getDayHour());//指标小时字段,例如2023021916         esData.put("minute", testKeyMetric.getMinute());//指标日期字段,例如202302191653         esData.put("day", testKeyMetric.getDay());//指标日期字段,例如20230219         esData.put("metric", testKeyMetric.getMetric());//指标key         esData.put("logUser", testKeyMetric.getLogUser());//登录用户         esData.put("count", testKeyMetric.getCount());//用户操作次数         bulkProcessor.insertData("vops-loginusermetrics", esData);//统计结果写入Elasticsearch表vops-loginusermetrics      }
});
}
复制代码


亦可以将指标数据保存到数据库:

4.1.3 定时采集数据策略配置

采用 jdk timer 执行数据采集调度策略,设置增量采集字段 log_id,通过 setFromFirst(true)控制作业重启是否重新采集数据,通过 setLastValueStorePath 设置增量状态保存文件路径(默认采用 sqlite 保存,可以切换其他数据库存储增量状态),通过 setLastValueType 设置增量状态字段类型(时间或者数字)


  //定时任务配置,      importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明//              .setScheduleDate(date) //指定任务开始执行时间:日期            .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行            .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次      //定时任务配置结束

//增量配置开始// importBuilder.setStatusDbname("test");//设置增量状态数据源名称 importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(true);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setLastValueStorePath("db2metrics_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样// importBuilder.setLastValueStoreTableName("logstable");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
复制代码


另外还可以设置第一次增量起始值,更多资料,参考文档:定时增量导入

4.1.4 案例运行数据样本

运行指标输出的 Elasticsearch 指标 dsl 数据样本:


{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20161217","week":"2016123","month":"201612","hour":"2016121713","dataTime":"2016-12-16T16:00:00.000Z","year":"2016","metric":"认证管理","count":2,"operModule":"认证管理","day":"20161217","minute":"201612171327"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20161218","week":"2016124","month":"201612","hour":"2016121816","dataTime":"2016-12-17T16:00:00.000Z","year":"2016","metric":"认证管理","count":1,"operModule":"认证管理","day":"20161218","minute":"201612181604"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20191221","week":"2019123","month":"201912","hour":"2019122123","dataTime":"2019-12-20T16:00:00.000Z","year":"2019","metric":"认证管理","count":2,"operModule":"认证管理","day":"20191221","minute":"201912212348"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20180609","week":"2018062","month":"201806","hour":"2018060900","dataTime":"2018-06-08T16:00:00.000Z","year":"2018","metric":"认证管理","count":1,"operModule":"认证管理","day":"20180609","minute":"201806090030"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20190728","week":"2019075","month":"201907","hour":"2019072821","dataTime":"2019-07-27T16:00:00.000Z","year":"2019","metric":"认证管理","count":3,"operModule":"认证管理","day":"20190728","minute":"201907282146"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20180327","week":"2018035","month":"201803","hour":"2018032723","dataTime":"2018-03-26T16:00:00.000Z","year":"2018","metric":"认证管理","count":3,"operModule":"认证管理","day":"20180327","minute":"201803272312"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"timeMetricKey":"20180412","week":"2018042","month":"201804","hour":"2018041216","dataTime":"2018-04-11T16:00:00.000Z","year":"2018","metric":"认证管理","count":2,"operModule":"认证管理","day":"20180412","minute":"201804121640"}
复制代码

4.2 一次性离线用户操作数据统计

本案例比较简单,通过去除定时策略,一次性统计用户操作表全量操作日志,统计用户操作次数指标和模块被操作次数指标,并将统计结果写入 Elasticsearch 指标库。


数据采集作业一次性或定期采集控制策略参考文档:数据同步模式控制


通过 MetricsOutputConfig 插件设置 ETLMetrics 和时间维度字段


 MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setOutputConfig(metricsOutputConfig);
复制代码


案例代码如下:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/metrics/Db2KeyMetricsDemo.java


ImportBuilder importBuilder = new ImportBuilder() ;      //在任务数据抽取之前做一些初始化处理,例如:通过删表来做初始化操作

importBuilder.setImportStartAction(new ImportStartAction() { /** * 初始化之前执行的处理操作,比如后续初始化操作、数据处理过程中依赖的资源初始化 * @param importContext */ @Override public void startAction(ImportContext importContext) { }
/** * 所有初始化操作完成后,导出数据之前执行的操作 * @param importContext */ @Override public void afterStartAction(ImportContext importContext) { try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginmodulemetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginmodulemetrics failed:",e); } try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginusermetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginusermetrics failed:",e); }
} });
DBInputConfig dbInputConfig = new DBInputConfig(); //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // 需要设置setLastValueColumn信息log_id, // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);

//bulkprocessor和Elasticsearch输出插件共用Elasticsearch数据源,因此额外进行数据源初始化定义 Map properties = new HashMap();
//default为默认的Elasitcsearch数据源名称 properties.put("elasticsearch.serverNames","default");
/** * 默认的default数据源配置,每个配置项可以加default.前缀,也可以不加 */

properties.put("default.elasticsearch.rest.hostNames","192.168.137.1:9200"); properties.put("default.elasticsearch.showTemplate","true"); properties.put("default.elasticUser","elastic"); properties.put("default.elasticPassword","changeme"); properties.put("default.elasticsearch.failAllContinue","true"); properties.put("default.http.timeoutSocket","60000"); properties.put("default.http.timeoutConnection","40000"); properties.put("default.http.connectionRequestTimeout","70000"); properties.put("default.http.maxTotal","200"); properties.put("default.http.defaultMaxPerRoute","100"); ElasticSearchBoot.boot(properties);
BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder(); bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(200)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(10)//bulk处理工作线程数 .setWorkThreadQueue(50)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("detail_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("detail bulkprocessor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .setElasticsearch("default")//指定明细Elasticsearch集群数据源名称,bboss可以支持多数据源 .setFilterPath(BulkConfig.ERROR_FILTER_PATH) .addBulkInterceptor(new BulkInterceptor() { public void beforeBulk(BulkCommand bulkCommand) {
}
public void afterBulk(BulkCommand bulkCommand, String result) { if(logger.isDebugEnabled()){ logger.debug(result); } }
public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } } public void errorBulk(BulkCommand bulkCommand, String result) { if(logger.isWarnEnabled()){ logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器 ; /** * 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */ BulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件 ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyMetircs){ @Override public void builderMetrics(){ //指标1 按操作模块统计模块操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } return operModule; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); } }; } });
//指标2 按照用户统计操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String logUser = (String) data.getData("logOperuser");// if(logUser == null || logUser.equals("")){ logUser = "未知用户"; } return logUser; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); } }; } }); // key metrics中包含两个segment(S0,S1) setSegmentBoundSize(5000000); }
/** * 存储指标计算结果 * @param metrics */ @Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime());
esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulekeymetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime());
esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginuserkeymetrics", esData); }
});
} }; //作业结束后销毁初始化阶段自定义的http数据源 importBuilder.setImportEndAction(new ImportEndAction() { @Override public void endAction(ImportContext importContext, Exception e) {
bulkProcessor.shutDown();
} }); MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setOutputConfig(metricsOutputConfig);

importBuilder// .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false .setBatchSize(10); //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理


/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
//采集作业task执行结果回调处理接口,可以对任务执行成功、错误、异常回调处理,同时可以通过taskMetrics获取任务执行统计数据,知悉任务记录执行处理情况:成功数、失败数、忽略数量、总数量等 importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() { @Override public void success(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void exception(TaskCommand<String,String> taskCommand, Throwable exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.debug(taskMetrics.toString()); }

});

/** * 构建和执行数据库表数据导入es和指标统计作业 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();
复制代码


运行指标输出的 Elasticsearch 指标 dsl 数据样本:


{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-11-02T15:54:09.000Z","metric":"认证管理","count":417,"operModule":"认证管理"}{ "index" : { "_index" : "vops-loginuserkeymetrics" } }{"logUser":"admin","dataTime":"2016-11-02T15:54:09.000Z","metric":"admin","count":468}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-11-02T12:30:11.000Z","metric":"机构管理","count":3,"operModule":"机构管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-11-02T15:21:39.000Z","metric":"用户管理","count":6,"operModule":"用户管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-12-06T16:09:35.000Z","metric":"岗位管理","count":1,"operModule":"岗位管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-12-20T08:17:56.000Z","metric":"资源管理","count":1,"operModule":"资源管理"}{ "index" : { "_index" : "vops-loginuserkeymetrics" } }{"logUser":"fffff","dataTime":"2017-02-03T05:52:01.000Z","metric":"fffff","count":3}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2017-02-03T05:52:58.000Z","metric":"权限管理","count":4,"operModule":"权限管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2018-03-12T15:37:34.000Z","metric":"站点管理","count":3,"operModule":"站点管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2018-03-12T15:40:53.000Z","metric":"频道管理","count":3,"operModule":"频道管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2018-03-12T15:59:20.000Z","metric":"文档管理","count":26,"operModule":"文档管理"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2018-03-12T17:00:14.000Z","metric":"璁よ瘉绠$悊","count":7,"operModule":"璁よ瘉绠$悊"}{ "index" : { "_index" : "vops-loginmodulekeymetrics" } }{"dataTime":"2016-11-02T12:57:04.000Z","metric":"认证-管理","count":26,"operModule":"认证-管理"}{ "index" : { "_index" : "vops-loginuserkeymetrics" } }{"logUser":"|admin","dataTime":"2016-11-02T12:57:04.000Z","metric":"|admin","count":26}
复制代码

4.3 增量采集用户操作数据并对用户操作数据进行统计

基于数字类型 db-es 增量同步及指标统计计算案例,如需调试功能,直接运行 main 方法即可。本案例实时按照一定的时间间隔,从数据库用户操作日志表中增量采集用户操作日志数据,并对用户 IP 地址信息数据进行加工处理后写入 Elasticsearch 数据库,同时将采集的数据交给指标插件,统计不同的用户每分钟操作系统次数,以及不同模块每分钟操作次数,将统计结果写入 Elasticsearch 数据库。数据采集频率每 10 秒采集一次,统计实际窗口为 1 分钟,时间维度字段为 logOpertime(操作时间)。


本案例直接通过 importBuilder 添加 ETLMetrics,并设置时间维度字段:


importBuilder.setDataTimeField("logOpertime");importBuilder.addMetrics(keyMetrics);ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig();      elasticsearchOutputConfig                .setTargetElasticsearch("default")            .setIndex("dbdemo")            .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id            .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false            .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false    
importBuilder.setOutputConfig(elasticsearchOutputConfig);
复制代码


同时结合 ElasticsearchOutputConfig 输出加工后的原始数据到 Elasticsearch


案例代码如下:


https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/metrics/Db2EleasticsearchMetricsDemo.java


ImportBuilder importBuilder = new ImportBuilder() ;      //在任务数据抽取之前做一些初始化处理,例如:通过删表来做初始化操作

importBuilder.setImportStartAction(new ImportStartAction() { /** * 初始化之前执行的处理操作,比如后续初始化操作、数据处理过程中依赖的资源初始化 * @param importContext */ @Override public void startAction(ImportContext importContext) {

importContext.addResourceStart(new ResourceStart() { @Override public ResourceStartResult startResource() { DBConf tempConf = new DBConf(); tempConf.setPoolname("testStatus"); tempConf.setDriver("com.mysql.cj.jdbc.Driver"); tempConf.setJdbcurl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true");
tempConf.setUsername("root"); tempConf.setPassword("123456"); tempConf.setValidationQuery("select 1");
tempConf.setInitialConnections(5); tempConf.setMinimumSize(10); tempConf.setMaximumSize(10); tempConf.setUsepool(true); tempConf.setShowsql(true); tempConf.setJndiName("testStatus-jndi"); //# 控制map中的列名采用小写,默认为大写 tempConf.setColumnLableUpperCase(false); //启动数据源 boolean result = SQLManager.startPool(tempConf); ResourceStartResult resourceStartResult = null; //记录启动的数据源信息,用户作业停止时释放数据源 if(result){ resourceStartResult = new DBStartResult(); resourceStartResult.addResourceStartResult("testStatus"); } return resourceStartResult; } });
}
/** * 所有初始化操作完成后,导出数据之前执行的操作 * @param importContext */ @Override public void afterStartAction(ImportContext importContext) { if(dropIndice) { try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("dbdemo"); } catch (Exception e) { logger.error("Drop indice dbdemo failed:",e); } } } });

DBInputConfig dbInputConfig = new DBInputConfig(); //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // 需要设置setLastValueColumn信息log_id, // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);

//bulkprocessor和Elasticsearch输出插件共用Elasticsearch数据源,因此额外进行数据源初始化定义 Map properties = new HashMap();
//default为默认的Elasitcsearch数据源名称 properties.put("elasticsearch.serverNames","default");
/** * 默认的default数据源配置,每个配置项可以加default.前缀,也可以不加 */

properties.put("default.elasticsearch.rest.hostNames","192.168.137.1:9200"); properties.put("default.elasticsearch.showTemplate","true"); properties.put("default.elasticUser","elastic"); properties.put("default.elasticPassword","changeme"); properties.put("default.elasticsearch.failAllContinue","true"); properties.put("default.http.timeoutSocket","60000"); properties.put("default.http.timeoutConnection","40000"); properties.put("default.http.connectionRequestTimeout","70000"); properties.put("default.http.maxTotal","200"); properties.put("default.http.defaultMaxPerRoute","100"); ElasticSearchBoot.boot(properties);
BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder(); bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(200)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(10)//bulk处理工作线程数 .setWorkThreadQueue(50)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("detail_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("detail bulkprocessor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .setElasticsearch("default")//指定明细Elasticsearch集群数据源名称,bboss可以支持多数据源 .setFilterPath(BulkConfig.ERROR_FILTER_PATH) .addBulkInterceptor(new BulkInterceptor() { public void beforeBulk(BulkCommand bulkCommand) {
}
public void afterBulk(BulkCommand bulkCommand, String result) { if(logger.isDebugEnabled()){ logger.debug(result); } }
public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } } public void errorBulk(BulkCommand bulkCommand, String result) { if(logger.isWarnEnabled()){ logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器 ; /** * 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */ BulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件 ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyTimeMetircs){ @Override public void builderMetrics(){ //指标1 按操作模块统计模块操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } return operModule; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); } }; } });
//指标2 按照用户统计操作次数 addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ CommonRecord data = (CommonRecord) mapData.getData(); String logUser = (String) data.getData("logOperuser");// if(logUser == null || logUser.equals("")){ logUser = "未知用户"; } return logUser; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); } }; } }); // key metrics中包含两个segment(S0,S1) setSegmentBoundSize(5000000); setTimeWindows(60 );//统计时间窗口 this.setTimeWindowType(MetricsConfig.TIME_WINDOW_TYPE_MINUTE); }
/** * 存储指标计算结果 * @param metrics */ @Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulemetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginusermetrics", esData); }
});
} }; //作业结束后销毁初始化阶段自定义的http数据源 importBuilder.setImportEndAction(new ImportEndAction() { @Override public void endAction(ImportContext importContext, Exception e) { //销毁初始化阶段自定义的数据源 importContext.destroyResources(new ResourceEnd() { @Override public void endResource(ResourceStartResult resourceStartResult) { if(resourceStartResult instanceof DBStartResult) { //作业停止时,释放db数据源 DataTranPluginImpl.stopDatasources((DBStartResult) resourceStartResult); } } }); bulkProcessor.shutDown();
} });
importBuilder.setDataTimeField("logOpertime"); importBuilder.setUseDefaultMapData(false); importBuilder.addMetrics(keyMetrics);
ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig .setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false
importBuilder.setOutputConfig(elasticsearchOutputConfig);
/** * 设置IP地址信息库 */ importBuilder.setGeoipDatabase("E:/workspace/hnai/terminal/geolite2/GeoLite2-City.mmdb"); importBuilder.setGeoipAsnDatabase("E:/workspace/hnai/terminal/geolite2/GeoLite2-ASN.mmdb"); importBuilder.setGeoip2regionDatabase("E:/workspace/hnai/terminal/geolite2/ip2region.db");
importBuilder// .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false .setBatchSize(10); //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
//定时任务配置, importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次 //定时任务配置结束//// //设置任务执行拦截器,可以添加多个,定时任务每次执行的拦截器 importBuilder.addCallInterceptor(new CallInterceptor() { @Override public void preCall(TaskContext taskContext) { System.out.println("preCall"); }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall"); }
@Override public void throwException(TaskContext taskContext, Throwable e) { System.out.println("throwException"); } });// //设置任务执行拦截器结束,可以添加多个 //增量配置开始// importBuilder.setStatusDbname("test");//设置增量状态数据源名称 importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(false);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setStatusDbname("testStatus");//指定增量状态数据源名称// importBuilder.setLastValueStorePath("logtable_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样 importBuilder.setLastValueStoreTableName("logstable");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
// /** * 通过DataRefactor接口,实现数据加工、清洗、转换、记录丢弃、打标等操作,将处理后的数据交给输出插件进行输出或者指标计算器进行指标统计计算 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception {// Date date = context.getDateValue("LOG_OPERTIME"); context.addFieldValue("collecttime",new Date()); IpInfo ipInfo = context.getIpInfoByIp("219.133.80.136"); if(ipInfo != null) context.addFieldValue("ipInfo", SimpleStringUtil.object2json(ipInfo)); } }); //映射和转换配置结束
/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
//采集作业task执行结果回调处理接口,可以对任务执行成功、错误、异常回调处理,同时可以通过taskMetrics获取任务执行统计数据,知悉任务记录执行处理情况:成功数、失败数、忽略数量、总数量等 importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() { @Override public void success(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void exception(TaskCommand<String,String> taskCommand, Throwable exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.debug(taskMetrics.toString()); }

});

/** * 构建和执行数据库表数据导入es和指标统计作业 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();
复制代码


定时统计用户操作次数

4.4 定时采集统计指标并 flush 指标统计结果

定时增量采集统计指标并 flush 指标统计结果案例,构建 KeyMetrics 类型的指标计算器,不设置指标时间维度相关字段。

4.4.1 任务调度后强制 flush 指标计算结果

在采用 KeyMetrics 类型,实现定时统计计算场景下,没有按照时间滑动窗口自动 flush 指标计算结果功能,因此需要在每次定时调度完毕时强制 flush 指标计算结果到指标数据库,通过以下代码控制每次任务调度完成后,强制 flush 指标计算结果存储到 Elasticsearch 指标数据库:


 importBuilder.setFlushMetricsOnScheduleTaskCompleted(true);//强制flush指标计算结果存储到Elasticsearch指标数据库        importBuilder.setWaitCompleteWhenflushMetricsOnScheduleTaskCompleted(true);//等待flush处理完毕后才返回        importBuilder.setCleanKeysWhenflushMetricsOnScheduleTaskCompleted(true);//flush完成后清理掉缓存区中的所有指标key对象
复制代码


案例代码:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/metrics/Db2KeyMetricsScheduleDemo.java


ImportBuilder importBuilder = new ImportBuilder() ;      //在任务数据抽取之前做一些初始化处理,例如:通过删表来做初始化操作

importBuilder.setImportStartAction(new ImportStartAction() { /** * 初始化之前执行的处理操作,比如后续初始化操作、数据处理过程中依赖的资源初始化 * @param importContext */ @Override public void startAction(ImportContext importContext) { }
/** * 所有初始化操作完成后,导出数据之前执行的操作 * @param importContext */ @Override public void afterStartAction(ImportContext importContext) { try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginmodulemetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginmodulemetrics failed:",e); } try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginusermetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginusermetrics failed:",e); }
} });
DBInputConfig dbInputConfig = new DBInputConfig(); //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // 需要设置setLastValueColumn信息log_id, // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);

//定义bulkprocessor依赖的Elasticsearch数据源 Map properties = new HashMap();
//default为默认的Elasitcsearch数据源名称 properties.put("elasticsearch.serverNames","default");
/** * 默认的default数据源配置,每个配置项可以加default.前缀,也可以不加 */

properties.put("default.elasticsearch.rest.hostNames","192.168.137.1:9200"); properties.put("default.elasticsearch.showTemplate","true"); properties.put("default.elasticUser","elastic"); properties.put("default.elasticPassword","changeme"); properties.put("default.elasticsearch.failAllContinue","true"); properties.put("default.http.timeoutSocket","60000"); properties.put("default.http.timeoutConnection","40000"); properties.put("default.http.connectionRequestTimeout","70000"); properties.put("default.http.maxTotal","200"); properties.put("default.http.defaultMaxPerRoute","100"); ElasticSearchBoot.boot(properties);
BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder(); bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(200)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(10)//bulk处理工作线程数 .setWorkThreadQueue(50)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("detail_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("detail bulkprocessor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .setElasticsearch("default")//指定明细Elasticsearch集群数据源名称,bboss可以支持多数据源 .setFilterPath(BulkConfig.ERROR_FILTER_PATH) .addBulkInterceptor(new BulkInterceptor() { public void beforeBulk(BulkCommand bulkCommand) {
}
public void afterBulk(BulkCommand bulkCommand, String result) { if(logger.isDebugEnabled()){ logger.debug(result); } }
public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } } public void errorBulk(BulkCommand bulkCommand, String result) { if(logger.isWarnEnabled()){ logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器 ; /** * 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */ BulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件 ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyMetircs){ @Override public void map(MapData mapData) { CommonRecord data = (CommonRecord) mapData.getData(); //可以添加多个指标
//指标1 按操作模块统计模块操作次数 String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } String metricKey = operModule; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); }
});
//指标2 按照用户统计操作次数 String logUser = (String) data.getData("logOperuser"); metricKey = logUser; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); }
});

}
@Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginmodulemetrics", esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData("vops-loginusermetrics", esData); }
});
} }; //作业结束后销毁初始化阶段自定义的http数据源 importBuilder.setImportEndAction(new ImportEndAction() { @Override public void endAction(ImportContext importContext, Exception e) {
bulkProcessor.shutDown();
} }); MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
metricsOutputConfig.setDataTimeField("logOpertime"); metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setFlushMetricsOnScheduleTaskCompleted(true); importBuilder.setWaitCompleteWhenflushMetricsOnScheduleTaskCompleted(true); importBuilder.setCleanKeysWhenflushMetricsOnScheduleTaskCompleted(true);
importBuilder.setOutputConfig(metricsOutputConfig);

importBuilder// .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false .setBatchSize(10); //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
//定时任务配置, importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(30000L); //每隔period毫秒执行,如果不设置,只执行一次 //定时任务配置结束


//增量配置开始//
importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(false);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setLastValueStorePath("logtablemetrics_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样 importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型

/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
//采集作业task执行结果回调处理接口,可以对任务执行成功、错误、异常回调处理,同时可以通过taskMetrics获取任务执行统计数据,知悉任务记录执行处理情况:成功数、失败数、忽略数量、总数量等 importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() { @Override public void success(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void exception(TaskCommand<String,String> taskCommand, Throwable exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.debug(taskMetrics.toString()); }

});

/** * 构建和执行数据库表数据导入es和指标统计作业 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();
复制代码

4.5 存储指标计算结果到数据库

增量采集数据并进行计算,存储指标计算结果到数据库,可以直接使用 dbinput 插件配置的数据源,也可以参考文档:自定义数据库数据源


案例代码如下:


ImportBuilder importBuilder = new ImportBuilder() ;      //在任务数据抽取之前做一些初始化处理,例如:通过删表来做初始化操作

importBuilder.setImportStartAction(new ImportStartAction() { /** * 初始化之前执行的处理操作,比如后续初始化操作、数据处理过程中依赖的资源初始化 * @param importContext */ @Override public void startAction(ImportContext importContext) { }
/** * 所有初始化操作完成后,导出数据之前执行的操作 * @param importContext */ @Override public void afterStartAction(ImportContext importContext) {
try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginmodulemetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginmodulemetrics failed:",e); } try { //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 ElasticSearchHelper.getRestClientUtil().dropIndice("vops-loginusermetrics"); } catch (Exception e) { logger.error("Drop indice vops-loginusermetrics failed:",e); }
} });
DBInputConfig dbInputConfig = new DBInputConfig(); //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // 需要设置setLastValueColumn信息log_id, // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);

//bulkprocessor和Elasticsearch输出插件共用Elasticsearch数据源,因此额外进行数据源初始化定义 Map properties = new HashMap();
//default为默认的Elasitcsearch数据源名称 properties.put("elasticsearch.serverNames","default");
/** * 默认的default数据源配置,每个配置项可以加default.前缀,也可以不加 */

properties.put("default.elasticsearch.rest.hostNames","192.168.137.1:9200"); properties.put("default.elasticsearch.showTemplate","true"); properties.put("default.elasticUser","elastic"); properties.put("default.elasticPassword","changeme"); properties.put("default.elasticsearch.failAllContinue","true"); properties.put("default.http.timeoutSocket","60000"); properties.put("default.http.timeoutConnection","40000"); properties.put("default.http.connectionRequestTimeout","70000"); properties.put("default.http.maxTotal","200"); properties.put("default.http.defaultMaxPerRoute","100"); ElasticSearchBoot.boot(properties);
int bulkSize = 150; int workThreads = 5; int workThreadQueue = 100; //定义BulkProcessor批处理组件构建器 CommonBulkProcessorBuilder bulkProcessorBuilder = new CommonBulkProcessorBuilder(); bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(bulkSize)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(workThreads)//bulk处理工作线程数 .setWorkThreadQueue(workThreadQueue)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("db_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("db bulkprocessor ")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .addBulkInterceptor(new CommonBulkInterceptor() {// 添加异步处理结果回调函数 /** * 执行前回调方法 * @param bulkCommand */ public void beforeBulk(CommonBulkCommand bulkCommand) {
}
/** * 执行成功回调方法 * @param bulkCommand * @param result */ public void afterBulk(CommonBulkCommand bulkCommand, BulkResult result) { if(logger.isDebugEnabled()){// logger.debug(result.getResult()); } }
/** * 执行异常回调方法 * @param bulkCommand * @param exception */ public void exceptionBulk(CommonBulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } }
/** * 执行过程中部分数据有问题回调方法 * @param bulkCommand * @param result */ public void errorBulk(CommonBulkCommand bulkCommand, BulkResult result) { if(logger.isWarnEnabled()){// logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器 /** * 设置执行数据批处理接口,实现对数据的异步批处理功能逻辑 */ .setBulkAction(new BulkAction() { public BulkResult execute(CommonBulkCommand command) { List<CommonBulkData> bulkDataList = command.getBatchBulkDatas();//拿出要进行批处理操作的数据 List<Map> logUserMetrics = new ArrayList<>(); List<Map> logModuleMetrics = new ArrayList<>(); for(int i = 0; i < bulkDataList.size(); i ++){ CommonBulkData commonBulkData = bulkDataList.get(i); Map data = (Map)commonBulkData.getData(); if(data.get("operModule") != null){ logModuleMetrics.add(data); } else{ logUserMetrics.add(data); } /** * 可以根据操作类型,对数据进行相应处理 */// if(commonBulkData.getType() == CommonBulkData.INSERT) 新增记录// if(commonBulkData.getType() == CommonBulkData.UPDATE) 修改记录// if(commonBulkData.getType() == CommonBulkData.DELETE) 删除记录
} BulkResult bulkResult = new BulkResult();//构建批处理操作结果对象 try { //调用数据库dao executor,将数据批量写入数据库,对应的sql语句addPositionUrl在xml配置文件dbbulktest.xml中定义

if(logUserMetrics.size() > 0) { //直接使用dbinput插件配置的数据源test,可以参考文档自定义数据库数据源 SQLExecutor.executeBatch("test","insert into logusermetrics(dataTime,hour,minute,day,metric,logUser,count) values(?,?,?,?,?,?,?)", logUserMetrics, 150, new BatchHandler<Map>() { public void handler(PreparedStatement stmt, Map record, int i) throws SQLException { stmt.setDate(1, new java.sql.Date(((Date)record.get("dataTime")).getTime())); stmt.setString(2, (String)record.get("hour")); stmt.setString(3, (String)record.get("minute")); stmt.setString(4, (String)record.get("day")); stmt.setString(5, (String)record.get("metric")); stmt.setString(6, (String)record.get("logUser")); stmt.setLong(6, (Long)record.get("count")); } }); }
if(logModuleMetrics.size() > 0) { //直接使用dbinput插件配置的数据源test SQLExecutor.executeBatch("test","insert into logmodulermetrics(dataTime,hour,minute,day,metric,operModule,count) values(?,?,?,?,?,?,?)", logModuleMetrics, 150, new BatchHandler<Map>() { public void handler(PreparedStatement stmt, Map record, int i) throws SQLException { stmt.setDate(1, new java.sql.Date(((Date)record.get("dataTime")).getTime())); stmt.setString(2, (String)record.get("hour")); stmt.setString(3, (String)record.get("minute")); stmt.setString(4, (String)record.get("day")); stmt.setString(5, (String)record.get("metric")); stmt.setString(6, (String)record.get("operModule")); stmt.setLong(6, (Long)record.get("count")); } }); }
} catch (Exception e){ //如果执行出错,则将错误信息设置到结果中 logger.error("",e); bulkResult.setError(true); bulkResult.setErrorInfo(e.getMessage()); } return bulkResult; } }) ; /** * 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */ final CommonBulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件
//作业结束后销毁初始化阶段自定义的http数据源 importBuilder.setImportEndAction(new ImportEndAction() { @Override public void endAction(ImportContext importContext, Exception e) {
bulkProcessor.shutDown();
} });
// 直接实现map和persistent方法,定义一个ETLMetrics ETLMetrics keyMetrics = new ETLMetrics(Metrics.MetricsType_KeyTimeMetircs){ @Override public void map(MapData mapData) { CommonRecord data = (CommonRecord) mapData.getData(); //可以添加多个指标
//指标1 按操作模块统计模块操作次数 String operModule = (String) data.getData("operModule"); if(operModule == null || operModule.equals("")){ operModule = "未知模块"; } String metricKey = operModule; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginModuleMetric(); }
});
//指标2 按照用户统计操作次数 String logUser = (String) data.getData("logOperuser"); metricKey = logUser; metric(metricKey, mapData, new KeyMetricBuilder() { @Override public KeyMetric build() { return new LoginUserMetric(); }
});

}
@Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ if(keyMetric instanceof LoginModuleMetric) { LoginModuleMetric testKeyMetric = (LoginModuleMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("operModule", testKeyMetric.getOperModule()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData( esData); } else if(keyMetric instanceof LoginUserMetric) { LoginUserMetric testKeyMetric = (LoginUserMetric) keyMetric; Map esData = new HashMap(); esData.put("dataTime", testKeyMetric.getDataTime()); esData.put("hour", testKeyMetric.getDayHour()); esData.put("minute", testKeyMetric.getMinute()); esData.put("day", testKeyMetric.getDay()); esData.put("metric", testKeyMetric.getMetric()); esData.put("logUser", testKeyMetric.getLogUser()); esData.put("count", testKeyMetric.getCount()); bulkProcessor.insertData( esData); }
});
} }; //如果要自定义创建MapData,设置BuildMapData即可 keyMetrics.setBuildMapData(new BuildMapData() { @Override public MapData buildMapData(MetricsData metricsData) { BuildMapDataContext buildMapDataContext = metricsData.getBuildMapDataContext(); MapData mapData = new MapData(){ /** * 根据指标标识,获取指标的时间统计维度字段,默认返回dataTime字段值,不同的指标需要指定不同的时间维度统计字段 * 分析处理作业可以覆盖本方法,自定义获取时间维度字段值 * @param metricsKey * @return */ public Date metricsDataTime(String metricsKey) {// if(metricsKey.equals("xxxx") ) {// Date time = (Date)data.get("collectime");// return time;// } return getDataTime(); }
}; Date dateTime = (Date) metricsData.getCommonRecord().getData("logOpertime"); mapData.setDataTime(dateTime);//默认按照操作时间作为指标时间维度字段,上面复写了metricsDataTime方法,可以根据指标key指定不同的时间维度值 mapData.setData(metricsData.getCommonRecord()); mapData.setDayFormat(buildMapDataContext.getDayFormat()); mapData.setHourFormat(buildMapDataContext.getHourFormat()); mapData.setMinuteFormat(buildMapDataContext.getMinuteFormat()); mapData.setYearFormat(buildMapDataContext.getYearFormat()); mapData.setMonthFormat(buildMapDataContext.getMonthFormat()); mapData.setWeekFormat(buildMapDataContext.getWeekFormat()); return mapData; } }); // key metrics中包含两个segment(S0,S1) keyMetrics.setSegmentBoundSize(5000000); keyMetrics.setTimeWindows(10); keyMetrics.init(); MetricsOutputConfig metricsOutputConfig = new MetricsOutputConfig();
//添加2个指标计算器 metricsOutputConfig.addMetrics(keyMetrics);
importBuilder.setOutputConfig(metricsOutputConfig);

importBuilder// .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false .setBatchSize(10); //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
//定时任务配置, importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次 //定时任务配置结束//// //设置任务执行拦截器,可以添加多个,定时任务每次执行的拦截器 importBuilder.addCallInterceptor(new CallInterceptor() { @Override public void preCall(TaskContext taskContext) { System.out.println("preCall"); }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall"); }
@Override public void throwException(TaskContext taskContext, Throwable e) { System.out.println("throwException"); } });// //设置任务执行拦截器结束,可以添加多个 //增量配置开始// importBuilder.setStatusDbname("test");//设置增量状态数据源名称 importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(true);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setLastValueStorePath("db2metricsmapdata_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样// importBuilder.setLastValueStoreTableName("logstable");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
/** * 通过DataRefactor接口,实现数据加工、清洗、转换、记录丢弃、打标等操作,将处理后的数据交给输出插件进行输出或者指标计算器进行指标统计计算 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception {// Date date = context.getDateValue("LOG_OPERTIME"); context.addFieldValue("collecttime",new Date()); IpInfo ipInfo = context.getIpInfoByIp("219.133.80.136"); if(ipInfo != null) context.addFieldValue("ipInfo", SimpleStringUtil.object2json(ipInfo)); } }); //映射和转换配置结束
/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 //采集作业task执行结果回调处理接口,可以对任务执行成功、错误、异常回调处理,同时可以通过taskMetrics获取任务执行统计数据,知悉任务记录执行处理情况:成功数、失败数、忽略数量、总数量等 importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() { @Override public void success(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); logger.debug(result); }
@Override public void exception(TaskCommand<String,String> taskCommand, Throwable exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.debug(taskMetrics.toString()); }

});

/** * 构建和执行数据库表数据导入es和指标统计作业 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();
复制代码

4.6 独立指标计算统计案例

同时我们也可以在应用中直接使用 Metrics 组件实现更加灵活的指标统计计算功能


一个典型的通用指标计算案例:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/tran/metrics/TestKeyTimeMetrics.java


//1. 定义Elasticsearch数据入库BulkProcessor批处理组件构建器BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder();bulkProcessorBuilder.setBlockedWaitTimeout(-1)//指定bulk工作线程缓冲队列已满时后续添加的bulk处理排队等待时间,如果超过指定的时候bulk将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
.setBulkSizes(200)//按批处理数据记录数 .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
.setWarnMultsRejects(1000)//由于没有空闲批量处理工作线程,导致bulk处理操作出于阻塞等待排队中,BulkProcessor会对阻塞等待排队次数进行计数统计,bulk处理操作被每被阻塞排队WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息 .setWorkThreads(10)//bulk处理工作线程数 .setWorkThreadQueue(50)//bulk处理工作线程池缓冲队列大小 .setBulkProcessorName("detail_bulkprocessor")//工作线程名称,实际名称为BulkProcessorName-+线程编号 .setBulkRejectMessage("detail bulkprocessor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀 .setElasticsearch("default")//指定明细Elasticsearch集群数据源名称,bboss可以支持多数据源 .setFilterPath(BulkConfig.ERROR_FILTER_PATH) .addBulkInterceptor(new BulkInterceptor() { public void beforeBulk(BulkCommand bulkCommand) {
}
public void afterBulk(BulkCommand bulkCommand, String result) { if(logger.isDebugEnabled()){ logger.debug(result); } }
public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) { if(logger.isErrorEnabled()){ logger.error("exceptionBulk",exception); } } public void errorBulk(BulkCommand bulkCommand, String result) { if(logger.isWarnEnabled()){ logger.warn(result); } } })//添加批量处理执行拦截器,可以通过addBulkInterceptor方法添加多个拦截器;/** * 2. 构建BulkProcessor批处理组件,一般作为单实例使用,单实例多线程安全,可放心使用 */BulkProcessor bulkProcessor = bulkProcessorBuilder.build();//构建批处理作业组件 //3. 定义KeyTimeMetircs类型指标计算器MetricsMetrics keyMetrics = new Metrics(Metrics.MetricsType_KeyTimeMetircs){
//4. 添加具体的指标对象及对应的指标key到Metrics @Override public void builderMetrics(){ addMetricBuilder(new MetricBuilder() { @Override public String buildMetricKey(MapData mapData){ Map data = (Map) mapData.getData(); String name = (String) data.get("name"); return name; } @Override public KeyMetricBuilder metricBuilder(){ return new KeyMetricBuilder() { @Override public KeyMetric build() { return new TestTimeMetric(); } }; } }); // 5. key metrics中包含两个segment(S0,S1),设置每个分区的大小(元素个数) setSegmentBoundSize(5000000); // 6. 定义时间窗口 setTimeWindows(60); }
/** * 6. 定义指标持久化机制,将计算好的指标结果通过异步批处理组件存储到Elasticsearch中的指标表 * @param metrics */ @Override public void persistent(Collection< KeyMetric> metrics) { metrics.forEach(keyMetric->{ TestTimeMetric testKeyMetric = (TestTimeMetric)keyMetric; Map esData = new HashMap();//封装指标统计结果 esData.put("dataTime", testKeyMetric.getDataTime());//指标统计计算时间 esData.put("hour", testKeyMetric.getDayHour());//指标小时字段,例如2023-02-19 16 esData.put("minute", testKeyMetric.getMinute());//指标日期字段,例如2023-02-19 16:53 esData.put("day", testKeyMetric.getDay());//指标日期字段,例如2023-02-19 esData.put("metric", testKeyMetric.getMetric());//指标key esData.put("name", testKeyMetric.getName());//维度字段 esData.put("count", testKeyMetric.getCount());//统计指标 bulkProcessor.insertData("vops-testkeytimemetrics",esData);//将指标结果保存到Elasticsearch指标表vops-testkeytimemetrics
});
}}; // 7. 初始化指标计算器keyMetrics.init();
// 8. 模拟并发(10线程)产生数据,并持续运行10分钟long startTime = System.currentTimeMillis();long times = 10l * 60l * 1000l;
Runnable runnable = new Runnable() { @Override public void run() { DateFormat yearFormat = MetricsConfig.getYearFormat(); DateFormat monthFormat = MetricsConfig.getMonthFormat(); DateFormat weekFormat = MetricsConfig.getWeekFormat(); DateFormat dayFormat = MetricsConfig.getDayFormat(); DateFormat hourFormat = MetricsConfig.getHourFormat(); DateFormat minuteFormat = MetricsConfig.getMinuteFormat(); while(true) { for (int i = 0; i < 1590; i++) { // 9. 通过MapData封装待统计的原始数据(一个Map类型的对象)、时间维度值、各种时间格式化对象 MapData<Map> mapData = new MapData<>(); mapData.setDataTime(new Date()); Map<String, String> data = new LinkedHashMap<>(); data.put("name", "Jack_" + i); mapData.setData(data); mapData.setDayFormat(dayFormat); mapData.setHourFormat(hourFormat); mapData.setMinuteFormat(minuteFormat); mapData.setYearFormat(yearFormat); mapData.setMonthFormat(monthFormat); mapData.setWeekFormat(weekFormat); // 10. 将mapData交给keyMetrics,进行指标统计计算 keyMetrics.map(mapData); } try { sleep(1000l); } catch (InterruptedException e) { e.printStackTrace(); } long now = System.currentTimeMillis(); long eTime = now - startTime; if(eTime >= times)//达到10分钟,终止生产数据 break; } }};List<Thread> ts = new ArrayList<>();for(int i = 0; i < 10; i ++){ Thread thread = new Thread(runnable,"run-"+i); thread.start(); ts.add(thread);}//等待所有线程运行结束ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); }});
//强制刷指标数据keyMetrics.forceFlush(true,//清楚指标key true );//等待数据处理完成后再返回
复制代码


篇幅关系,相关案例直接访问 git 源码托管地址下载:


https://gitee.com/bboss/bboss-datatran-demo


对应的案例目录


https://gitee.com/bboss/bboss-datatran-demo/tree/main/src/main/java/org/frameworkset/tran/metrics

4.7 典型应用案例---互联网用户行为分析监控

互联网电子渠道用户行为分析监控系统:采用 bboss 流批一体化计算框架,构建起一整套用户行为监控指标体系,完美地实现互联网用户行为实时分析和监控功能。



用户头像

大河

关注

开源项目bboss作者 2022-08-15 加入

https://www.bbossgroups.com

评论

发布
暂无评论
数据采集&流批一体化处理使用指南_批处理_大河_InfoQ写作社区