写点什么

bboss 数据同步 ETL 工具介绍

作者:大河
  • 2022 年 8 月 29 日
    湖南
  • 本文字数:52939 字

    阅读完需:约 174 分钟

bboss数据同步ETL工具介绍

bboss 数据同步 ETL 工具介绍


The best elasticsearch highlevel java rest api-----bboss


数据同步作业开发视频教程:http服务数据采集作业发布和构建运行教程


数据同步作业开发调试工程源码地址:https://git.oschina.net/bboss/bboss-datatran-demo


数据同步案例大全:bboss数据采集ETL案例大全

工具特性

​ bboss-datatran 由 bboss 开源的数据采集同步 ETL 工具,提供数据采集、数据清洗转换处理和数据入库功能。 bboss-datatran 数据同步作业直接采用 java 语言开发,小巧而精致,同时又可以采用 java 提供的所有功能和现有组件框架,随心所欲地处理和加工海量存量数据、实时增量数据;可以根据数据规模及同步性能要求,按需配置和调整数据采集同步作业所需内存、工作线程、线程队列大小;可以将作业独立运行,亦可以将作业嵌入基于 java 开发的各种应用一起运行;提供了作业任务控制 API、作业监控 api,支持作业启动、暂停(pause)、继续(resume)、停止控制机制,可轻松定制一款属于自己的 ETL 管理工具。


如果您还在:


  • 苦于 logstash、flume、filebeat 之类的开源工具无法满足复杂的、海量数据自定义加工处理场景;

  • 苦于无法调用企业现有服务和库来处理加工数据;

  • 苦于因项目投入有限、进度紧,急需一款功能强大、上手快、实施简单的数据交换工具


那么 bboss-datatran 将是一个不错的选择。



采用标准的输入输出异步管道来处理数据



1)数据导入的方式


  • 支持逐条数据导入

  • 批量数据导入

  • 批量数据多线程并行导入

  • 定时全量(串行/并行)数据导入

  • 定时增量(串行/并行)数据导入

  • 支持记录切割功能


2)支持各种主流数据库、各种 es 版本以及本地/Ftp 日志文件数据采集和同步、加工处理


支持在 Elasticsearch、关系数据库、Mongodb、HBase、Hive、Kafka、文本文件、excel 文件、SFTP/FTP、http/https 多种数据源之间进行海量数据采集同步;支持数据实时增量采集和全量采集;支持根据字段进行数据记录切割;支持多级文件路径(本地和 FTP/SFTP)下不同文件数据采集写入不同的数据库表和其他数据源。


支持各种数据库: mysql,maridb,postgress,oracle ,sqlserver,db2,tidb,hive,mongodb、HBase 等


支持各种 Elasticsearch 版本: 1.x,2.x,5.x,6.x,7.x,8.x,+


3)提供自定义处理采集数据功能,可以按照自己的要求将采集的数据处理到目的地,如需定制化将数据保存到特定的地方,可自行实现 CustomOutPut 接口处理即可。


4)支持从 kafka 接收数据;经过加工处理的数据亦可以发送到 kafka;


5)支持将单条记录切割为多条记录;


6)可以将加工后的数据写入 File 并上传到 ftp/sftp 服务器;


7)支持备份采集完毕日志文件功能,可以指定备份文件保存时长,定期清理超过时长文件;


8)支持自动清理下载完毕后 ftp 服务器上的文件;


9)支持 excel、csv 文件采集(本地和 ftp/sftp)


10)支持导出数据到 excel 和 csv 文件,并支持上传到 ftp/sftp 服务器


11)支持海量 PB 级数据同步导入功能


12)支持将 ip 转换为对应的运营商和城市地理坐标位置信息


13)支持设置数据 bulk 导入任务结果处理回调函数,对每次 bulk 任务的结果进行成功和失败反馈,然后针对失败的 bulk 任务通过 error 和 exception 方法进行相应处理


14)支持以下三种作业调度机制:


  • jdk timer (内置)

  • quartz

  • xxl-job 分布式调度引擎,基于分片调度机制实现海量数据快速同步能力


  1. 提供灵活的作业启动、暂停(pause)、继续(resume)、停止控制机制


​ 下面通过案例来介绍 ETL 工具的使用方法,本文案例工程地址


https://github.com/bbossgroups/db-elasticsearch-tool


https://gitee.com/bboss/db-elasticsearch-tool

插件清单

输入插件

输出插件

作业基础配置

创建一个作业构建器ImportBuilder importBuilder = new ImportBuilder() ;
复制代码



本文主要以关系数据库表同步到 Elasticsearch 为案例介绍 bboss datatran 的功能.

1.准备工作

1.1 在工程中导入 bboss maven 坐标

Elasticsearch/Database/Http/Custom(自定义处理器)/Dummy 插件坐标


<dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-datatran-jdbc</artifactId><version>6.7.2</version></dependency>
复制代码


kafka 插件 maven 坐标


<dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-datatran-kafka2x</artifactId><version>6.7.2</version></dependency>
复制代码


日志文件/excel/csv/ftp/sftp 插件 maven 坐标


<dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-datatran-fileftp</artifactId><version>6.7.2</version></dependency>
复制代码


hbase 插件 maven 坐标


<dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-datatran-hbase</artifactId><version>6.7.2</version></dependency>
复制代码


mongodb 插件 maven 坐标


<dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-datatran-mongodb</artifactId><version>6.7.2</version></dependency>
复制代码


如果需要增量导入,还需要导入 sqlite 驱动:


<dependency>      <groupId>org.xerial</groupId>      <artifactId>sqlite-jdbc</artifactId>      <version>3.36.0.3</version>      <scope>compile</scope> </dependency>
复制代码


如果需要使用 xxjob 来调度作业任务,需要导入坐标:


<dependency>      <groupId>com.xuxueli</groupId>      <artifactId>xxl-job-core</artifactId>      <version>2.0.2</version>      <scope>compile</scope> </dependency>
复制代码


本文从 mysql 数据库表 td_cms_document 导入数据到 es 中,除了导入上述 maven 坐标,还需要额外导入 mysql 驱动坐标(其他数据库驱动程序自行导入):mysql 5.x 驱动依赖包


<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.40</version></dependency>
复制代码


mysql 8.x 驱动依赖包(mysql 8 必须采用相应版本的驱动,否则不能正确运行)


<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency>
复制代码

1.2 提前创建索引结构

一般情况下 elasticsearch 会根据 bboss 导入数据的类型自动创建索引 mapping 结构,但是默认创建的索引 mapping 往往不能满足实际要求,这时就需提前建立好自定义的索引 mapping 结构或者与索引名称匹配的索引 mapping 模板,具体定义和创建方法参考文档: Elasticsearch索引表和索引表模板管理

2.数据库表数据导入到 Elasticsearch

2.1.案例对应的源码

一次性批量导入:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/Db2EleasticsearchOnceScheduleDateDemo.java


定时增量导入:https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/Db2EleasticsearchDemo.java

2.2.索引表结构定义

Elasticsearch 会在我们导入数据的情况下自动创建索引 mapping 结构,如果对 mapping 结构有特定需求或者自动创建的结构不能满足要求,可以自定义索引 mapping 结构,在导入数据之前创建好自定义的 mapping 结构或者 mapping 模板即可,具体定义和创建方法参考文档: Elasticsearch索引表和索引表模板管理

2.3 定义作业配置

作业需要通过 ImportBuilder 来进行配置和构建,大致的流程如下:


//创建作业构建器ImportBuilder importBuilder = new ImportBuilder() ;
//输入插件配置importBuilder.setInputConfig(dbInputConfig);
//输出插件配置importBuilder.setOutputConfig(elasticsearchOutputConfig);
//作业基础配置importBuilder.setBatchSize(5000);...... //作业执行/** * 创建执行数据库表数据导入es作业DataStream对象 */DataStream dataStream = importBuilder.builder();dataStream.execute();//启动运行作业
//停止作业dataStream.destroy();
//暂停作业dataStream.pauseSchedule();
//继续作业dataStream.resumeSchedule();
复制代码


针对数据库和 Elasticsearch 插件的配置,bboss 支持可以在 application.properties 文件中配置相关数据源,亦可以在插件上面直接配置数据源,下面文档中都有介绍。


更多的作业调度控制说明,可以参考文档:https://esdoc.bbossgroups.com/#/bboss-datasyn-control

2.4.配置 DBInput 输入参数

DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test") .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true) .setDbInitSize(5) .setDbMinIdleSize(5) .setDbMaxSize(10) .setShowSql(true);//是否使用连接池; importBuilder.setInputConfig(dbInputConfig);
复制代码

2.5.配置 Elasticsearch 输出参数

新建 ElasticsearchOutputConfig 配置对象:


ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig();        elasticsearchOutputConfig                .addTargetElasticsearch("elasticsearch.serverNames","default")                .addElasticsearchProperty("default.elasticsearch.rest.hostNames","192.168.137.1:9200")                .addElasticsearchProperty("default.elasticsearch.showTemplate","true")                .addElasticsearchProperty("default.elasticUser","elastic")                .addElasticsearchProperty("default.elasticPassword","changeme")                .addElasticsearchProperty("default.elasticsearch.failAllContinue","true")                .addElasticsearchProperty("default.http.timeoutSocket","60000")                .addElasticsearchProperty("default.http.timeoutConnection","40000")                .addElasticsearchProperty("default.http.connectionRequestTimeout","70000")                .addElasticsearchProperty("default.http.maxTotal","200")                .addElasticsearchProperty("default.http.defaultMaxPerRoute","100")                .setIndex("dbdemo")                .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id                .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false                .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false        /**         elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() {         //如果指定EsIdGenerator,则根据下面的方法生成文档id,         // 否则根据setEsIdField方法设置的字段值作为文档id,         // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig);
复制代码

2.6 作业基本配置

批量写入es记录数importBuilder.setBatchSize(5000);importBuilder//                .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId                .setUseLowcase(true)  //可选项,true 列名称转小写,false列名称不转换小写,默认false,只要在UseJavaName为false的情况下,配置才起作用                .setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false                .setBatchSize(10);  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
//定时任务配置, importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次 //定时任务配置结束//增量配置开始// importBuilder.setStatusDbname("test");//设置增量状态数据源名称 importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(false);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据,// setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setStatusDbname("testStatus");//指定增量状态数据源名称// importBuilder.setLastValueStorePath("logtable_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样 importBuilder.setLastValueStoreTableName("logstable");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型// SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");// try {// Date date = format.parse("2000-01-01");// importBuilder.setLastValue(date);//增量起始值配置// }// catch (Exception e){// e.printStackTrace();// } // 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型 //增量配置结束/** * 内置线程池配置,实现多线程并行数据导入功能,作业完成退出时自动关闭该线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
复制代码

2.7 作业执行

/** * 执行数据库表数据导入es操作 */DataStream dataStream = importBuilder.builder();dataStream.execute();//执行导入操作
复制代码

2.8 作业详解

2.8.1 同步批量导入

批量导入 Elasicsearch 记录数配置:


importBuilder.setBatchSize(5000)


数据库 jdbcFetchSize 设置 mysql 提供两种处理机制支持海量数据的导入,一种机制是在 application.properties 文件配置连接串和指定 fetch 相关的 useCursorFetch 和 jdbcFetchSize 参数:


db.url = jdbc:mysql://192.168.137.1:3306/bboss?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=false
##mysql 8 url配置样例#db.url = jdbc:mysql://192.168.0.188:3308/braineex?useCursorFetch=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=truedb.jdbcFetchSize = 10000
复制代码


另外一种机制可以参考文档章节:


2.8.14 Mysql ResultSet Stream机制说明


根据设置的 SQL 语句,同步批量一次性导入数据到 Elasticsearch 中,非定时执行。


    public void testSimpleImportBuilder(){        ImportBuilder importBuilder = new ImportBuilder() ;        DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test"); importBuilder.setInputConfig(dbInputConfig);

// importBuilder.addFieldMapping("LOG_CONTENT","message");// importBuilder.addIgnoreFieldMapping("remark1");// importBuilder.setSql("select * from td_sm_log "); ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig.setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false /** elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() { //如果指定EsIdGenerator,则根据下面的方法生成文档id, // 否则根据setEsIdField方法设置的字段值作为文档id, // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig);
/** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.execute(); dataStream.destroy();//执行完毕后释放资源 }
复制代码


可以直接运行上述代码,查看数据导入效果。

2.8.2 异步批量导入

根据设置的 SQL 语句,异步批量一次性导入数据到 Elasticsearch 中,非定时执行。异步批量导入关键配置:


        importBuilder.setParallel(true);//设置为多线程异步并行批量导入        importBuilder.setQueue(100);//设置批量导入线程池等待队列长度        importBuilder.setThreadCount(200);//设置批量导入线程池工作线程数量
复制代码


示例代码如下:


    public void testSimpleLogImportBuilderFromExternalDBConfig(){        ImportBuilder importBuilder = new ImportBuilder() ;        DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test"); importBuilder.setInputConfig(dbInputConfig);

// importBuilder.addFieldMapping("LOG_CONTENT","message");// importBuilder.addIgnoreFieldMapping("remark1");// importBuilder.setSql("select * from td_sm_log "); ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig.setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false /** elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() { //如果指定EsIdGenerator,则根据下面的方法生成文档id, // 否则根据setEsIdField方法设置的字段值作为文档id, // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig); /** * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入 importBuilder.setQueue(100);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(200);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 importBuilder.setRefreshOption("refresh"); // 为了实时验证数据导入的效果,强制刷新数据,生产环境请设置为null或者不指定 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.execute(); dataStream.destroy();//执行完毕后释放资源 //获取索引表dbdemo中的数据总量,如果没有设置refreshOption,es插入数据会有几秒的延迟(具体的延迟多久取决于index refresh interval配置),所以countAll统计出来的结果不一定准确 long count = ElasticSearchHelper.getRestClientUtil().countAll("dbdemo"); System.out.println("数据导入完毕后索引表dbdemo中的文档数量:"+count); }
复制代码


说明:从数据库检索数据放入批处理列表,到达 batchsize 就提交一次作业,最多 threadcount 个工作线程并行处理作业,如果线程都在忙,没有空闲的工作线程,那么作业就会放到队列里面排队,如果队列也满了,则会阻塞等待释放的队列位置,每等待 100 次打印一次等待次数的日志。


batchsize,queue,threadcount 的配置要结合服务器的内存和 cpu 配置来设置,设置大了容易内存溢出,设置小了影响处理速度,所以要权衡考虑。



导入的时候需要观察 elasticsearch 服务端的 write 线程池的状态,如果出现 reject 任务的情况,就需要调优 elasticsearch 配置参数:


thread_pool.bulk.queue_size: 1000 es 线程等待队列长度


thread_pool.bulk.size: 10 线程数量,与 cpu 的核数对应

2.8.2.1 异步 ResultSet 缓冲队列大小配置

//数据异步同步通道缓存队列设置,默认为10importBuilder.setTranDataBufferQueue(20);
复制代码


bboss 会将采集数据先放入异步结果迭代器 resultset 缓冲队列,队列长度对应的参数为 tranDataBufferQueue;


如果目标写入比较慢,通过调整数据采集异步结果迭代器 resultset 数据缓冲队列 tranDataBufferQueue 大小,可以得到更好的数据采集性能,如果调大该参数会占用更多的 jvm 内存。

2.8.3 一个有字段属性映射的稍微复杂案例实现

    public void testImportBuilder(){        ImportBuilder importBuilder = new ImportBuilder() ;        DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test"); importBuilder.setInputConfig(dbInputConfig);

// importBuilder.addFieldMapping("LOG_CONTENT","message");// importBuilder.addIgnoreFieldMapping("remark1");// importBuilder.setSql("select * from td_sm_log "); ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig.setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false /** elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() { //如果指定EsIdGenerator,则根据下面的方法生成文档id, // 否则根据setEsIdField方法设置的字段值作为文档id, // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig); /** * db-es mapping 表字段名称到es 文档字段的映射:比如document_id -> docId * 可以配置mapping,也可以不配置,默认基于java 驼峰规则进行db field-es field的映射和转换 */ importBuilder.addFieldMapping("document_id","docId") .addFieldMapping("docwtime","docwTime") .addIgnoreFieldMapping("channel_id");//添加忽略字段 /** * 为每条记录添加额外的字段和值 * 可以为基本数据类型,也可以是复杂的对象 */ importBuilder.addFieldValue("testF1","f1value"); importBuilder.addFieldValue("testInt",0); importBuilder.addFieldValue("testDate",new Date()); importBuilder.addFieldValue("testFormateDate","yyyy-MM-dd HH",new Date()); TestObject testObject = new TestObject(); testObject.setId("testid"); testObject.setName("jackson"); importBuilder.addFieldValue("testObject",testObject);
/** * 重新设置es数据结构 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception { //可以根据条件定义是否丢弃当前记录 //if(something is true) { // context.setDrop(true); // return; //} CustomObject customObject = new CustomObject(); customObject.setAuthor((String)context.getValue("author")); customObject.setTitle((String)context.getValue("title")); customObject.setSubtitle((String)context.getValue("subtitle")); context.addFieldValue("docInfo",customObject);//如果还需要构建更多的内部对象,可以继续构建
//上述三个属性已经放置到docInfo中,如果无需再放置到索引文档中,可以忽略掉这些属性 context.addIgnoreFieldMapping("author"); context.addIgnoreFieldMapping("title"); context.addIgnoreFieldMapping("subtitle"); } });
/** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.execute(); }
复制代码

2.8.4 设置文档 id 机制

bboss 充分利用 elasticsearch 的文档 id 生成机制,同步数据的时候提供了以下 3 种生成文档 Id 的机制:


  1. 不指定文档 ID 机制:直接使用 Elasticsearch 自动生成文档 ID

  2. 指定表字段,对应的字段值作为 Elasticsearch 文档 ID

  3. importBuilder.setEsIdField("log_id");//设置文档主键,不设置,则自动产生文档 id

  4. 自定义文档 id 机制配置


elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() {            //如果指定EsIdGenerator,则根据下面的方法生成文档id,            // 否则根据setEsIdField方法设置的字段值作为文档id,            // 如果既没有配置EsIdField也没有指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } });
复制代码

2.8.5 定时增量导入

定时机制配置


//定时任务配置,        importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明//           .setScheduleDate(date) //指定任务开始执行时间:日期                .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行                .setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次        //定时任务配置结束
复制代码

2.8.5.1 数字增量同步

支持按照数字字段和时间字段进行增量导入,增量导入 sql 的语法格式:


select * from td_sm_log where log_id > #[log_id]
复制代码


通过 #[xxx],指定变量,变量可以在 sql 中出现多次:


select * from td_sm_log where log_id > #[log_id] and other_id = #[log_id]
复制代码


数字类型增量导入配置:


importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
复制代码

2.8.5.2 日期时间戳增量同步

sql 语句格式:


select * from td_sm_log where collecttime > #[collecttime]
复制代码


日期类型增量导入配置


importBuilder.setLastValueColumn("collecttime");//手动指定日期增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
importBuilder.setLastValueType(ImportIncreamentConfig.TIMESTAMP_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.TIMESTAMP_TYPE数字类型
复制代码

2.8.5.3 日期类型增量字段日期格式配置

可以指定日期增量字段日期格式,当增量字段为日期类型且日期格式不是默认的


yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
复制代码


时,需要设置字段相对应的日期格式,例如:


yyyy-MM-dd HH:mm:ss
复制代码


如果是默认 utc 格式,则不需要手动设置指定


  importBuilder.setLastValueDateformat("yyyy-MM-dd HH:mm:ss");
复制代码


lastValueDateformat 只对从 elasticsearch 增量采集数据起作用,因为 elasticsearch 返回非 UTC 格式日期字符串时,需要通过指定对应的日期格式,才能将字符串形式的日期转换为增量字段状态管理需要的 Date 类型。

2.8.5.4 时间戳增量导出截止时间偏移量配置

日期类型增量导入,还可以设置一个导入截止时间偏移量。引入 IncreamentEndOffset 配置,主要是增量导出时,考虑到 elasticsearch、mongodb 这种存在写入数据的延迟性的数据库,设置一个相对于当前时间偏移量导出的截止时间,避免增量导出时遗漏数据。


importBuilder.setIncreamentEndOffset(300);//单位秒,同步从上次同步截止时间当前时间前5分钟的数据,下次继续从上次截止时间开始同步数据
复制代码


bboss 会自动增加一个内部变量 collecttime__endTime(增量字段名称后面加上__endTime 后缀),这样我们增量同步 sql 就可以写成如下方式:


select * from td_sm_log where collecttime > #[collecttime] and collecttime <= #[collecttime__endTime] 
复制代码


看一个增量时间戳同步的 elasticsearch dsl 用法


<?xml version="1.0" encoding='UTF-8'?><properties>    <description>        <![CDATA[            配置数据导入的dsl         ]]>    </description>    <!--          条件片段     -->    <property name="queryCondition">        <![CDATA[         "query": {                "bool": {                    "filter": [                                                 {   ## 增量检索范围,可以是时间范围,也可以是数字范围,这里采用的是数字增量字段                            "range": {                                                               "collecttime": { ## 时间增量检索字段                                    "gt": #[collecttime],                                    "lte": #[collecttime__endTime]                                }                                                           }                        }
] } } ]]> </property>
<!-- 简单的scroll query案例,复杂的条件修改query dsl即可 --> <property name="scrollQuery"> <![CDATA[ { "size":#[size], @{queryCondition} } ]]> </property> <!-- 简单的slice scroll query案例,复杂的条件修改query dsl即可 --> <property name="scrollSliceQuery"> <![CDATA[ { "slice": { "id": #[sliceId], ## 必须使用sliceId作为变量名称 "max": #[sliceMax] ## 必须使用sliceMax作为变量名称 }, "size":#[size], @{queryCondition} } ]]> </property>

</properties>
复制代码

2.8.5.5 控制重启作业是否重新开始同步数据

setFromFirst 的使用


setFromfirst(false),如果作业停了,作业重启后从上次停止的位置开始采集数据,setFromfirst(true) 如果作业停了,作业重启后,重新开始位置开始采集数据
复制代码


详细的增量导入案例:


源码文件 https://github.com/bbossgroups/db-elasticsearch-tool/blob/master/src/main/java/org/frameworkset/elasticsearch/imp/Dbdemo.java


    public void testSimpleLogImportBuilderFromExternalDBConfig(){        ImportBuilder importBuilder = new ImportBuilder() ;        DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test"); importBuilder.setInputConfig(dbInputConfig);

// importBuilder.addFieldMapping("LOG_CONTENT","message");// importBuilder.addIgnoreFieldMapping("remark1");// importBuilder.setSql("select * from td_sm_log "); ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig.setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false /** elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() { //如果指定EsIdGenerator,则根据下面的方法生成文档id, // 否则根据setEsIdField方法设置的字段值作为文档id, // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig) .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效 importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次// importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(true);//setFromfirst(false),如果作业停了,作业重启后从上次截止位置开始采集数据, //setFromfirst(true) 如果作业停了,作业重启后,重新开始采集数据 importBuilder.setLastValueStorePath("testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点// importBuilder.setLastValueStoreTableName("logs");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型 // 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型
// importBuilder.
/** * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();//执行导入操作

}
复制代码

2.8.6 定时全量导入

定时机制配置


//定时任务配置,        importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明//           .setScheduleDate(date) //指定任务开始执行时间:日期                .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行                .setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次        //定时任务配置结束
复制代码


    public void testSimpleLogImportBuilderFromExternalDBConfig(){        ImportBuilder importBuilder = new ImportBuilder() ;        DBInputConfig dbInputConfig = new DBInputConfig();        //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,        // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如:        // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]        // 需要设置setLastValueColumn信息log_id,        // 通过setLastValueType方法告诉工具增量字段的类型,默认是数字类型
// importBuilder.setSql("select * from td_sm_log where LOG_OPERTIME > #[LOG_OPERTIME]"); dbInputConfig.setSql("select * from td_sm_log where log_id > #[log_id]") .setDbName("test"); importBuilder.setInputConfig(dbInputConfig);

// importBuilder.addFieldMapping("LOG_CONTENT","message");// importBuilder.addIgnoreFieldMapping("remark1");// importBuilder.setSql("select * from td_sm_log "); ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig(); elasticsearchOutputConfig.setTargetElasticsearch("default") .setIndex("dbdemo") .setEsIdField("log_id")//设置文档主键,不设置,则自动产生文档id .setDebugResponse(false)//设置是否将每次处理的reponse打印到日志文件中,默认false .setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认false /** elasticsearchOutputConfig.setEsIdGenerator(new EsIdGenerator() { //如果指定EsIdGenerator,则根据下面的方法生成文档id, // 否则根据setEsIdField方法设置的字段值作为文档id, // 如果默认没有配置EsIdField和如果指定EsIdGenerator,则由es自动生成文档id
@Override public Object genId(Context context) throws Exception { return SimpleStringUtil.getUUID();//返回null,则由es自动生成文档id } }); */// .setIndexType("dbdemo") ;//es 7以后的版本不需要设置indexType,es7以前的版本必需设置indexType;// .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 /** * es相关配置 */// elasticsearchOutputConfig.setTargetElasticsearch("default,test");//同步数据到两个es集群
importBuilder.setOutputConfig(elasticsearchOutputConfig) .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效 importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明// .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次



/** * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.execute();//执行导入操作

}
复制代码

2.8.7 定时任务指定执行拦截器使用

可以为同步定时任务指定执行拦截器,示例如下:


        //设置任务执行拦截器,可以添加多个        importBuilder.addCallInterceptor(new CallInterceptor() {            @Override            public void preCall(TaskContext taskContext) {                System.out.println("preCall");                //可以在这里做一些重置初始化操作,比如删mapping之类的            }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall"); }
@Override public void throwException(TaskContext taskContext, Exception e) { System.out.println("throwException"); } }).addCallInterceptor(new CallInterceptor() { @Override public void preCall(TaskContext taskContext) { System.out.println("preCall 1"); }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall 1"); }
@Override public void throwException(TaskContext taskContext, Exception e) { System.out.println("throwException 1"); } });
复制代码


拦截器常被应用于任务上下文数据的定义和获取,任务拦截器对 kafka-to-target 类型的数据同步作业不起作用。

2.8.7.1 任务上下文数据定义和获取

在一些特定场景下,避免任务执行过程中重复加载数据,需要在任务每次调度执行前加载一些任务执行过程中不会变化的数据,放入任务上下文 TaskContext;任务执行过程中,直接从任务上下文中获取数据即可。例如:将每次任务执行的时间戳放入任务执行上下文。


通过 TaskContext 对象的 addTaskData 方法来添加上下文数据,通过 TaskContext 对象的 getTaskData 方法来获取任务上下文数据.

2.8.7.1.1 定义任务上下文数据

任务上下文数据定义-通过 CallInterceptor 接口的 preCall 的来往 TaskContext 对象来添加 任务上下文数据


@Overridepublic void preCall(TaskContext taskContext) {   String formate = "yyyyMMddHHmmss";   //HN_BOSS_TRADE00001_YYYYMMDDHHMM_000001.txt   SimpleDateFormat dateFormat = new SimpleDateFormat(formate);   String time = dateFormat.format(new Date());   taskContext.addTaskData("time",time);//定义任务执行时时间戳参数time}
复制代码


完整代码- 任务上下文数据定义


       //设置任务执行拦截器,可以添加多个      importBuilder.addCallInterceptor(new CallInterceptor() {         @Override         public void preCall(TaskContext taskContext) {            String formate = "yyyyMMddHHmmss";            //HN_BOSS_TRADE00001_YYYYMMDDHHMM_000001.txt            SimpleDateFormat dateFormat = new SimpleDateFormat(formate);            String time = dateFormat.format(new Date());            taskContext.addTaskData("time",time);//定义任务执行时时间戳参数time         }
@Override public void afterCall(TaskContext taskContext) { System.out.println("afterCall 1"); }
@Override public void throwException(TaskContext taskContext, Exception e) { System.out.println("throwException 1"); } }); //设置任务执行拦截器结束,可以添加多个
复制代码
2.8.7.1.2 获取任务上下文数据

在生成文件名称的接口方法中获取任务上下文数据


fileFtpOupputConfig.setFilenameGenerator(new FilenameGenerator() {   @Override   public String genName( TaskContext taskContext,int fileSeq) {
String time = (String)taskContext.getTaskData("time");//获取任务执行时间戳参数time String _fileSeq = fileSeq+""; int t = 6 - _fileSeq.length(); if(t > 0){ String tmp = ""; for(int i = 0; i < t; i ++){ tmp += "0"; } _fileSeq = tmp+_fileSeq; }


return "HN_BOSS_TRADE"+_fileSeq + "_"+time +"_" + _fileSeq+".txt"; }});
复制代码


在生成文件中的记录内容时获取任务上下文数据


fileFtpOupputConfig.setReocordGenerator(new ReocordGenerator() {         @Override         public void buildRecord(Context context, CommonRecord record, Writer builder) {            //SerialUtil.normalObject2json(record.getDatas(),builder);            String data = (String)context.getTaskContext().getTaskData("data");//获取全局参数//          System.out.println(data);
} });
复制代码


在 datarefactor 方法中获取任务上下文数据


/**       * 重新设置es数据结构       */      importBuilder.setDataRefactor(new DataRefactor() {         public void refactor(Context context) throws Exception  {
String data = (String)context.getTaskContext().getTaskData("data");
} });
复制代码


2.8.8 定时任务调度说明

定时增量导入的关键配置:


sql 语句指定增量字段


//指定导入数据的 sql 语句,必填项,可以设置自己的提取逻辑,设置增量变量 log_id importBuilder.setSql("select * from td_sm_log where log_id > #[log_id]");


bboss 自动提取 log_id 作为增量字段,目前支持 number 和 timestamp 两种类型,如果是时间戳,还需要指定一下类型:


importBuilder.setLastValueType(ImportIncreamentConfig.TIMESTAMP_TYPE );//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型        // 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型
复制代码


对于修改增量的同步,一般用修改时间戳来作为增量同步字段,同时将数据库记录主键作为文档 ID:


elasticsearchOutputConfig.setEsIdField("log_id");//设置文档主键,不设置,则自动产生文档id
复制代码


指定定时 timer


importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明//           .setScheduleDate(date) //指定任务开始执行时间:日期                     .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行                     .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
复制代码


上面说明的是基于 jdk timer 组件的定时调度,bboss 还可以通过quartzxx-job、elastic-job(目前未支持)来定时调度同步作业,进行以下配置指示 bboss 采用外部任务调度器:


//采用外部定时任务importBuilder.setExternalTimer(true);
复制代码

2.8.9 增量导入注意事项

2.8.9.1 排序设置

bboss 5.9.3 及之前的版本需要注意:如果增量字段默认自带排序功能(比如采用主键 id 作为增量字段),则 sql 语句不需要显式对查询的数据进行排序,否则需要在 sql 语句中显式基于增量字段升序排序:


importBuilder.setSql("select * from td_sm_log where update_date > #[log_id] order by update_date asc");
复制代码


bboss 5.9.3 及后续的版本已经内置了对增量字段值的排序功能,所以在 sql 或者 dsl 中不需要额外进行排序设置,可以提升导入性能。

2.8.9.2 增量状态存储数据库

bboss 默认采用 sqlite 保存增量状态,通过 setLastValueStorePath 方法设置 sqlite 数据库文件路径


importBuilder.setLastValueStorePath("/app/data/testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
复制代码


/app/data/testdb 代表/app/data/目录下的 sqlite 数据库文件 testdb,如果在同一个进程中运行多个数据采集作业,并且采用 sqlite 作为增量状态管理,由于 sqlite 的单线程数据库限制,必须每个作业一个独立的 sqlite 数据库,因此除了设置不同的 sqlite 数据库文件路径,还需指定不同的 statusDBname,例如:


作业 1


importBuilder.setStatusDbname("job1");importBuilder.setLastValueStorePath("/app/data/job1");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
复制代码


作业 2


importBuilder.setStatusDbname("job2");importBuilder.setLastValueStorePath("/app/data/job2");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
复制代码


sqlite 作为一个本地单线程文件数据库,可能在一些场景下无法满足要求,譬如要做监控界面实时查看作业数据采集状态,尤其是在采用分布式作业调度引擎时,定时增量导入需要指定 mysql 等关系型增量状态存储数据库。


bboss 支持将增量状态保存到其他关系数据库中(譬如 mysql),具体的配置方法如下:


保存增量状态的数据源配置

2.8.9.3 设置增量同步增量字段起始值

可以指定增量字段的起始值,不指定的情况下数字默认起始值 0,日期默认起始值:1970-01-01


指定日期字段增量同步起始值:


        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");        try {            Date date = format.parse("2000-01-01");            importBuilder.setLastValue(date);//增量起始值配置        }        catch (Exception e){            e.printStackTrace();        }
复制代码


指定数字字段增量同步起始值:


                 try {                         importBuilder.setLastValue(100);//增量起始值配置        }        catch (Exception e){            e.printStackTrace();        }
复制代码

2.8.10 灵活控制文档数据结构

2.8.10.1 全局处理

可以通过 importBuilder 全局扩展添加字段到 es 索引中:


        importBuilder.addFieldValue("testF1","f1value");        importBuilder.addFieldValue("testInt",0);        importBuilder.addFieldValue("testDate",new Date());        importBuilder.addFieldValue("testFormateDate","yyyy-MM-dd HH",new Date());        importBuilder.addIgnoreFieldMapping("subtitle");//全局忽略字段        importBuilder.addFieldMapping("dbcolumn","esFieldColumn");//全局添加字段名称映射
复制代码

2.8.10.2 记录级别处理

如果需要针对单条记录,bboss 提供 org.frameworkset.tran.DataRefactor 接口和 Context 接口像结合来提供对数据记录的自定义处理功能,这样就可以灵活控制文档数据结构,通过 context 可以对当前记录做以下调整:



全局数据处理配置:打 tag,标识数据来源于 jdk timer 还是 xxl-job


importBuilder.addFieldValue("fromTag","jdk timer");  //jdk timer调度作业设置
importBuilder.addFieldValue("fromTag","xxl-jobr"); //xxl-job调度作业设置
复制代码


记录级别的转换处理参考下面的代码,举例说明如下:


final AtomicInteger s = new AtomicInteger(0);      /**       * 重新设置es数据结构       */      importBuilder.setDataRefactor(new DataRefactor() {         public void refactor(Context context) throws Exception  {            //可以根据条件定义是否丢弃当前记录            //context.setDrop(true);return;            if(s.incrementAndGet() % 2 == 0) {               context.setDrop(true);               return;            }            //空值处理,判断字段content的值是否为空            if(context.getValue("content") == null){               context.addFieldValue("content","");//将content设置为""            }
CustomObject customObject = new CustomObject(); customObject.setAuthor((String)context.getValue("author")); customObject.setTitle((String)context.getValue("title")); customObject.setSubtitle((String)context.getValue("subtitle"));
customObject.setIds(new int[]{1,2,3}); context.addFieldValue("author",customObject); long testtimestamp = context.getLongValue("testtimestamp");//将long类型的时间戳转换为Date类型 context.addFieldValue("testtimestamp",new Date(testtimestamp));//将long类型的时间戳转换为Date类型//修改字段名称title为新名称newTitle,并且修改字段的值 context.newName2ndData("title","newTitle",(String)context.getValue("title")+" append new Value");
//上述属性已经放置到docInfo中,如果无需再放置到索引文档中,可以忽略掉这些属性
context.addIgnoreFieldMapping("subtitle");
//关联查询数据,单值查询 //sql中有多个条件用逗号分隔追加 Map headdata = SQLExecutor.queryObjectWithDBName(Map.class,context.getEsjdbc().getDbConfig().getDbName(), "select * from head where billid = ? and othercondition= ?", context.getIntegerValue("billid"),"otherconditionvalue"); //将headdata中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定 context.addFieldValue("headdata",headdata); //关联查询数据,多值查询 List<Map> facedatas = SQLExecutor.queryListWithDBName(Map.class,context.getEsjdbc().getDbConfig().getDbName(), "select * from facedata where billid = ?", context.getIntegerValue("billid")); //将facedatas中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定 context.addFieldValue("facedatas",facedatas); } }); //映射和转换配置结束
复制代码


注意:


1.内嵌的数据库查询会有性能损耗,在保证性能的前提下,尽量将内嵌的 sql 合并的外部查询数据的整体的 sql 中,或者采用缓存技术消除内部 sql 查询。


2.一定要注意全局级和记录级调整区别:在 DataRefactor 接口中只能用 Context 来调整数据字段映射和字段添加修改和移除操作


2.8.10.3 过滤记录

如果需要根据情况过滤特定的记录,可以通过以下方法将记录标记为过滤记录即可:


 context.setDrop(true);
复制代码


例如


String id = context.getStringValue("_id");//根据字段值忽略对应的记录,这条记录将不会被同步到elasticsearch中 if(id.equals("5dcaa59e9832797f100c6806")){    context.setDrop(true);}
复制代码

2.8.10.4 获取记录元数据

可以通过 context.getMetaValue(metaName)获取记录的元数据,比如文件信息、elasticsearch 文档元数据、hbase 元数据,使用示例:


获取 elasticsearch 文档 id


String docId = (String)context.getMetaValue("_id");
复制代码


elasticsearch 元数据清单



/**文档id信息*/ private String _id; /**文档对应索引类型信息*/ private String type; /**文档对应索引字段信息*/ private Map<String, List<Object>> fields; /**文档对应版本信息*/ private long version; /**文档对应的索引名称*/ private String index; /**文档对应的高亮检索信息*/ private Map<String,List<Object>> highlight; /**文档对应的排序信息*/ private Object[] sort; /**文档对应的评分信息*/ private Double score; /**文档对应的父id*/ private Object parent; /**文档对应的路由信息*/ private Object routing; /**文档对应的是否命中信息*/ private boolean found; /**文档对应的nested检索信息*/ private Map<String,Object> nested; /**文档对应的innerhits信息*/ private Map<String,Map<String, InnerSearchHits>> innerHits; /**文档对应的索引分片号*/ private String shard; /**文档对应的elasticsearch集群节点名称*/ private String node; /**文档对应的打分规则信息*/ private Explanation explanation;
private long seqNo; private long primaryTerm;
复制代码


hbase 元数据清单


byte[] rowkeyDate timestamp
复制代码


文件元数据清单


Date @timestampMap @filemeta    filemeta的数据字段定义如下:
common.put("hostIp", BaseSimpleStringUtil.getIp());common.put("hostName",BaseSimpleStringUtil.getHostName());common.put("filePath",FileInodeHandler.change(file.getAbsolutePath()));
common.put("pointer",pointer);common.put("fileId",fileInfo.getFileId());FtpConfig ftpConfig = this.fileConfig.getFtpConfig();if(ftpConfig != null){ common.put("ftpDir",ftpConfig.getRemoteFileDir()); common.put("ftpIp",ftpConfig.getFtpIP()); common.put("ftpPort",ftpConfig.getFtpPort()); common.put("ftpUser",ftpConfig.getFtpUser() != null?ftpConfig.getFtpUser():"-"); common.put("ftpProtocol",ftpConfig.getTransferProtocolName());}
复制代码

2.8.11 IP-地区运营商经纬度坐标转换

与 geolite2 和 ip2region 相结合,bboss 支持将 ip 地址转换为国家-省份-城市-运营商-经纬度坐标信息,我们在 DataRefactor 中,可以获取 ip 对应的运营商和地区信息,举例说明:


/**       * 重新设置es数据结构,获取ip对应的运营商和区域信息案例       */      importBuilder.setDataRefactor(new DataRefactor() {         public void refactor(Context context) throws Exception  {                /**             * 获取ip对应的运营商和区域信息,remoteAddr字段存放ip信息             */            IpInfo ipInfo = context.getIpInfo("remoteAddr");            context.addFieldValue("ipInfo",ipInfo);            context.addFieldValue("collectTime",new Date());                     }      });
复制代码


首先需下载最新的开源信息库


geolite2 ip 库文件:


https://dev.maxmind.com/geoip/geoip2/geolite2/#Downloads


ip2region 库文件:


https://github.com/lionsoul2014/ip2region/blob/master/data/ip2region.db


ip 地址库设置方式有两种:


  • 方式 1 在 appliction.properties 文件中配置 ip 地址信息库(可选,如果有 elasticsearch 数据源时有效,否则需采用方式 2)


在 application.properties 文件中配置对应的 ip 信息库文件地址


ip.cachesize = 10000# geoip的ip地址信息库下载地址https://dev.maxmind.com/geoip/geoip2/geolite2/ip.database = E:/workspace/geolite2/GeoLite2-City.mmdbip.asnDatabase = E:/workspace/geolite2/GeoLite2-ASN.mmdbip.ip2regionDatabase=E:/workspace/ipdb/ip2region.db
复制代码


  • 方式 2 代码中直接设置 ip 地址信息库

2.8.12 设置任务执行结果回调处理函数

我们通过 importBuilder 的 setExportResultHandler 方法设置任务执行结果以及异常回调处理函数,函数实现接口即可:


org.frameworkset.tran.ExportResultHandler


//设置数据bulk导入任务结果处理回调函数,对每次bulk任务的结果进行成功和失败反馈,然后针对失败的bulk任务通过error方法进行相应处理importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() {   @Override   public void success(TaskCommand<String,String> taskCommand, String result) {      String datas = taskCommand.getDatas();//执行的批量数据      System.out.println(result);//打印成功结果   }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { //具体怎么处理失败数据可以自行决定,下面的示例显示重新导入失败数据的逻辑: // 从result中分析出导入失败的记录,然后重新构建data,设置到taskCommand中,重新导入, // 支持的导入次数由getMaxRetry方法返回的数字决定 // String failDatas = ...; //taskCommand.setDatas(failDatas); //taskCommand.execute(); String datas = taskCommand.getDatas();//执行的批量数据 System.out.println(result);//打印失败结果 }@Override public void exception(TaskCommand<String, String> taskCommand, Exception exception) { //任务执行抛出异常,失败处理方法,特殊的异常可以调用taskCommand的execute方法重试 if(need retry) taskCommand.execute(); } /** * 如果对于执行有错误的任务,可以进行修正后重新执行,通过本方法 * 返回允许的最大重试次数 * @return */ @Override public int getMaxRetry() { return -1; }});
复制代码

2.8.13 灵活指定索引名称和索引类型

可以全局通过 importBuilder 组件设置索引类型和索引名称,也可以通过 Context 接口为相关的数据记录指定索引类型和索引名称:


  • 如果没有在记录级别指定索引名称则采用全局指定索引名称,如果在记录级别指定了索引名称则采用记录级别指定的索引名称

  • 如果没有在记录级别指定索引类型则采用全局指定索引类型,如果在记录级别指定了索引类型则采用记录级别指定的索引类型

2.8.13.1 importBuilder 组件全局设置索引类型和索引名称

importBuilder                .setIndex("dbclobdemo") //必填项                .setIndexType("dbclobdemo") //elasticsearch7之前必填项,之后的版本不需要指定
复制代码

2.8.13.2 通过 Context 接口设置记录索引类型和索引名称

final Random random = new Random();        importBuilder.setDataRefactor(new DataRefactor() {            @Override            public void refactor(Context context) throws Exception {                int r = random.nextInt(3);                if(r == 1) {                                        context.setIndex("contextdbdemo-{dateformat=yyyy.MM.dd}");                }                else if(r == 0) {                                        context.setIndex("contextxxx-{dateformat=yyyy.MM.dd}");                                    }                else if(r == 2){                                        context.setIndex("contextbbbbb-{dateformat=yyyy.MM.dd}");                }
} });
复制代码

2.8.13.3 index 和 type 可以有以下几种动态生成方法

索引名称由demowithesindex和日期类型字段agentStarttime通过yyyy.MM.dd格式化后的值拼接而成=dbclobdemo-{agentStarttime,yyyy.MM.dd}= 索引名称由demowithesindex和当前日期通过yyyy.MM.dd格式化后的值拼接而成=demowithesindex-{dateformat=yyyy.MM.dd}
索引名称由demowithesindex和日期类型字段agentStarttime通过yyyy.MM.dd格式化后的值拼接而成=demowithesindex-{field=agentStarttime,dateformat=yyyy.MM.dd}
索引类型为typeFieldName字段对应的值={field=typeFieldName}或者{typeFieldName}=
复制代码


示例如下:


importBuilder                .setIndex("demo-{dateformat=yyyy.MM.dd}") //必填项                .setIndexType("dbclobdemo") //elasticsearch7之前必填项,之后的版本不需要指定
复制代码


importBuilder                .setIndex("demo-{agentStarttime,yyyy.MM.dd}") //必填项                .setIndexType("dbclobdemo") //elasticsearch7之前必填项,之后的版本不需要指定
复制代码

2.8.13.4 设置 routing 的方法

在 DataRefactor 中指定 routing 值


importBuilder.setDataRefactor(new DataRefactor() {            public void refactor(Context context) throws Exception  {                                 org.frameworkset.tran.config.ClientOptions clientOptions = new org.frameworkset.tran.config.ClientOptions();                clientOptions.setRouting("1");                context.setClientOptions(clientOptions);                            }        });
复制代码


通过 importBuilder 全局指定 routing field,将对应字段的值作为 routing:


org.frameworkset.tran.config.ClientOptions clientOptions = new org.frameworkset.tran.config.ClientOptions();clientOptions.setRoutingField(new ESField("parentid"));elasticsearchOutputConfig.setClientOptions(clientOptions);
复制代码

2.8.14 Mysql ResultSet Stream 机制说明

同步 Mysql 大数据表到 Elasticsearch 时,针对 jdbc fetchsize(ResultSet Stream)的使用比较特殊,mysql 提供了两种机制来处理:


机制一 mysql 5 以后的版本采用 jdbc url 串参数 useCursorFetch=true 以及配置 fetchsize 属性来实现,bboss 在 application.properties 中做如下配置即可:


db.url = jdbc:mysql://192.168.137.1:3306/bboss?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=falsedb.jdbcFetchSize = 10000
复制代码


机制二 配置 jdbcFetchSize 为最小整数来采用 mysql 的默认实现机制,db url 中不要带 useCursorFetch 参数(适用 mysql 各版本)


# 注意:url中不要带useCursorFetch参数db.url = jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false# Integer.MIN_VALUEdb.jdbcFetchSize = -2147483648
复制代码


在代码中使用机制二:


        //数据源相关配置,可选项,可以在外部启动数据源        importBuilder.setDbName("test")                .setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包                .setDbUrl("jdbc:mysql://localhost:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false")//没有带useCursorFetch=true参数,jdbcFetchSize参数配置为-2147483648,否则不会生效                   .setJdbcFetchSize(-2147483648);                .setDbUser("root")                .setDbPassword("123456")                .setValidateSQL("select 1")                .setUsePool(true);//是否使用连接池
复制代码


机制二需要 bboss elasticsearch 5.7.2以后的版本才支持。

2.8.15 用配置文件来管理同步 sql

如果同步的 sql 很长,那么可以在配置文件中管理同步的 sql


首先定义一个 xml sql 配置文件


在工程 resources 目录下创建一个名称为 sql.xml 的配置文件(路径可以自己设定,如果有子目录,那么在 setSqlFilepath 方法中带上相对路径即可),内容如下:


<?xml version="1.0" encoding='UTF-8'?><properties>    <description>        <![CDATA[   配置数据导入的sql ]]>    </description>    <!--增量导入sql-->    <!--<property name="demoexport"><![CDATA[select * from td_sm_log where log_id > #[log_id]]]></property>-->    <!--全量导入sql-->    <property name="demoexportFull"><![CDATA[select * from td_sm_log ]]></property>
</properties>
复制代码


然后,利用 api 指定配置文件相对 classpath 路径和对应 sql 配置名称即可


importBuilder.setSqlFilepath("sql.xml")           .setSqlName("demoexportFull");
复制代码

2.8.16 设置 ES 数据导入控制参数

数据同步工具可以全局设置 Elasticsearch 请求控制参数(基于 importBuilder 组件设置),也可以在记录级别设置 Elasticsearch 请求控制参数(基于 Context 接口设置),这里举例进行说明:

2.8.16.1 全局设置 Elasticsearch 请求控制参数

可以通过 elasticsearchOutputConfig 直接提供的方法设置数据导入 Elasticsearch 的各种控制参数,例如 routing,esid,parentid,refresh 策略,版本信息等等:


elasticsearchOutputConfig.setEsIdField("documentId")//可选项,es自动为文档产生id                .setEsParentIdField("documentParentid") //可选项,如果不指定,文档父子关系父id对应的字段                .setRoutingField("routingId") //可选项    importBuilder.setRoutingValue("1");                .setEsDocAsUpsert(true)//可选项                .setEsRetryOnConflict(3)//可选项                .setEsReturnSource(false)//可选项                .setEsVersionField(“versionNo”)//可选项                .setEsVersionType("internal")//可选项                .setRefreshOption("refresh=true&version=1");//可选项,通过RefreshOption可以通过url参数的方式任意组合各种控制参数
复制代码


还可以通过 ClientOptions 对象来指定控制参数,使用示例:


        elasticsearchOutputConfig.setEsIdField("log_id");//设置文档主键,不设置,则自动产生文档id        ClientOptions clientOptions = new ClientOptions();//    clientOptions.setPipeline("1");        clientOptions.setRefresh("true");//    routing//        (Optional, string) Target the specified primary shard.        clientOptions.setRouting("2");        clientOptions.setTimeout("50s");        clientOptions.setWaitForActiveShards(2);        elasticsearchOutputConfig.setClientOptions(clientOptions);
复制代码

2.8.16.2 记录级别设置 Elasticsearch 请求控制参数

基于 Context 接口,可以在记录级别设置 Elasticsearch 请求控制参数,记录级别会继承 importBuilder 设置的控制参数设置的控制参数,但是会覆盖通过 elasticsearchOutputConfig 设置的同名控制参数,记录级别控制参数使用示例:


final Random random = new Random();        importBuilder.setDataRefactor(new DataRefactor() {            @Override            public void refactor(Context context) throws Exception {                int r = random.nextInt(3);                if(r == 1) {                    ClientOptions clientOptions = new ClientOptions();                    clientOptions                            .setEsRetryOnConflict(1)//              .setPipeline("1")
.setOpType("index") .setIfPrimaryTerm(2l) .setIfSeqNo(3l) ;//create or index context.setClientOptions(clientOptions); //context.setIndex("contextdbdemo-{dateformat=yyyy.MM.dd}"); } else if(r == 0) { ClientOptions clientOptions = new ClientOptions();
clientOptions.setDetectNoop(false) .setDocasupsert(false) .setReturnSource(true) .setEsRetryOnConflict(3) ;//设置文档主键,不设置,则自动产生文档id; context.setClientOptions(clientOptions); //context.setIndex("contextdbdemo-{dateformat=yyyy.MM.dd}"); context.markRecoredUpdate(); } else if(r == 2){ ClientOptions clientOptions = new ClientOptions(); clientOptions.setEsRetryOnConflict(2);// .setPipeline("1"); context.setClientOptions(clientOptions); context.markRecoredDelete(); //context.setIndex("contextdbdemo-{dateformat=yyyy.MM.dd}"); }
} });
复制代码


参考文档:


基于refreshoption参数指定添加修改文档控制参数


Elasticsearch 控制参数参考文档:


https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html

2.8.17 数据同步任务执行统计信息获取

任务日志相关设置

 importBuilder.setPrintTaskLog(true);//true 打印作业任务执行统计日志,false 不打印作业任务统计信息
复制代码


对于 Elasticsearch 写入和查询 dsl 日志的控制,可以参考文档进行关闭和打开


DSL 脚本调试日志开关,将 showTemplate 设置为 true,同时将日志级别设置为 INFO,则会将 query dsl 脚本输出到日志文件中:


    elasticsearch.showTemplate=true  ## true 打印dsl(logger必须设置info级别) false 不打印dsl
复制代码


spring boot 配置项


    spring.elasticsearch.bboss.elasticsearch.showTemplate=true  ## true 打印dsl(logger必须设置为info级别) false 不打印dsl
复制代码

任务级别统计信息

通过数据同步任务执行结果回调处理函数,可以获取到每个任务的详细执行统计信息:


importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() {            @Override            public void success(TaskCommand<String,String> taskCommand, String result) {                TaskMetrics taskMetrics = taskCommand.getTaskMetrics();                logger.info(taskMetrics.toString());            }
@Override public void error(TaskCommand<String,String> taskCommand, String result) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); }
@Override public void exception(TaskCommand<String,String> taskCommand, Exception exception) { TaskMetrics taskMetrics = taskCommand.getTaskMetrics(); logger.info(taskMetrics.toString()); }
@Override public int getMaxRetry() { return 0; } });
复制代码


输出的结果如下:


{    "jobStartTime": 2022-03-24 14:46:52, //作业开始时间,date类型,输出json时被转换为long值    "taskStartTime": 2022-03-24 14:46:53,//当前任务开始时间,date类型,输出json时被转换为long值    "taskEndTime": 2022-03-24 14:46:53,//当前任务结束时间,date类型,输出json时被转换为long值    "totalRecords": 4, //作业处理总记录数    "totalFailedRecords": 0,//作业处理总失败记录数    "totalIgnoreRecords": 0,//作业处理总忽略记录数    "totalSuccessRecords": 4,//作业处理总成功记录数    "successRecords": 2,//当前任务处理总成功记录数    "failedRecords": 0,//当前任务处理总失败记录数    "ignoreRecords": 0,//当前任务处理总忽略记录数        "taskNo": 3,//当前任务编号    "lastValue": 1998,//任务截止增量字段值或者增量时间戳        "jobNo": "eece3d34320b490a980d3f501cb7ae8c" //任务对应的作业编号,一个作业会被拆分为多个任务执行}
复制代码

作业级别统计信息

通过 CallInterceptor 拦截器接口,在作业调度执行完成的时候,可以在方法 afterCall 和 throwException 中,通过 taskContext.getJobTaskMetrics()获取作业任务统计信息,示例如下:


//设置任务执行拦截器,可以添加多个importBuilder.addCallInterceptor(new CallInterceptor() {   @Override   public void preCall(TaskContext taskContext) {
String formate = "yyyyMMddHHmmss"; //HN_BOSS_TRADE00001_YYYYMMDDHHMM_000001.txt SimpleDateFormat dateFormat = new SimpleDateFormat(formate); String time = dateFormat.format(new Date()); //可以在preCall方法中设置任务级别全局变量,然后在其他任务级别和记录级别接口中通过taskContext.getTaskData("time");方法获取time参数 taskContext.addTaskData("time",time);
}
@Override public void afterCall(TaskContext taskContext) { logger.info(taskContext.getJobTaskMetrics().toString()); }
@Override public void throwException(TaskContext taskContext, Exception e) { logger.info(taskContext.getJobTaskMetrics().toString(),e); }});
复制代码


打印的统计信息格式如下:


JobNo:558e370ae01041c4baf4835882fc6a77,JobStartTime:2022-03-24 14:46:52,JobEndTime:2022-03-24 14:46:53,Total Records:497,Total Success Records:497,Total Failed Records:0,Total Ignore Records:0,Total Tasks:50,lastValue: 1998//任务截止增量字段值或者增量时间戳
复制代码


往 kafka 推送数据时,异步特性,因为任务全部提交完成后,数据还未发送完毕,回调 afterCall 方法时,作业级别统计信息可能不完整,如果需要完整的统计信息,可以调用方法来等待统计完成,例如:


        @Override            public void afterCall(TaskContext taskContext) {                taskContext.await();                //taskContext.await(100000l); //指定一个最长等待时间                logger.info("afterCall ----------"+taskContext.getJobTaskMetrics().toString());            }
复制代码

2.8.18 设置并行导入参数

代码里面加上下面参数,可以并行导入,导入速度会更快


importBuilder.setParallel(true);//设置为多线程并行批量导入importBuilder.setQueue(10);//设置批量导入线程池等待队列长度importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
复制代码

2.8.19 同步增删改数据到 ES

数据同步工具可以非常方便地将各种数据源(Elasticsearch、DB、Mongodb 等)的增删改操作同步到 Elasticsearch 中。在 DataRefactor 接口中,通过 Context 接口提供的三个方法来标注记录的增、删、改数据状态,同步工具根据记录状态的来实现对 Elasticsearch 的新增、修改、删除同步操作:


context.markRecoredInsert();//添加,默认值,如果不显示标注记录状态则默认为添加操作,对应Elasticsearch的index操作
context.markRecoredUpdate();//修改,对应Elasticsearch的update操作
context.markRecoredDelete();//删除,对应Elasticsearch的delete操作
复制代码


使用示例:


final Random random = new Random();        importBuilder.setDataRefactor(new DataRefactor() {            @Override            public void refactor(Context context) throws Exception {                int r = random.nextInt(3);                if(r == 1) {                    context.markRecoredInsert();                    //context.setIndex("contextdbdemo-{dateformat=yyyy.MM.dd}");                }                else if(r == 0) {                                       //context.setIndex("contextdbdemo1-{dateformat=yyyy.MM.dd}");                    context.markRecoredUpdate();                }                else if(r == 2){                                     context.markRecoredDelete();                    //context.setIndex("contextdbdemo2-{dateformat=yyyy.MM.dd}");                }
} });
复制代码


可以从数据源直接获取增删改的数据:



也可以先将需要增删改的数据推送到 kafka,同步工具从 kafka 接收增删改数据,再进行相应的处理:


2.8.20 同步数据到多个 ES 集群

bboss 可以非常方便地将数据同步到多个 ES 集群,本小节介绍使用方法。


importBuilder 组件指定多 ES 集群的方法如下:


elasticsearchOutputConfig.setTargetElasticsearch("default,test");
复制代码


多个集群数据源名称用逗号分隔,多 ES 集群数据源配置参考文档:


5.2 多elasticsearch服务器集群支持

2.8.21 导入日期类型数据少 8 小时问题

在数据导入时,如果是时间类型,Elasticsearch 默认采用 UTC 时区(而不是东八区时区)保存日期数据,如果通过 json 文本查看数据,会发现少 8 小时,这个是正常现象,通过 bboss orm 检索数据,日期类型数据就会自动将 UTC 时区转换为东八区时间(也就是中国时区,自动加 8 小时)

2.8.22 记录切割

在数据导入时,有时需将单条记录切割为多条记录,通过设置切割字段以及 SplitHandler 接口来实现,可以将指定的字段拆分为多条新记录,新产生的记录会自动继承原记录其他字段数据,亦可以指定覆盖原记录字段值,示例代码如下:


      importBuilder.setSplitFieldName("@message");        importBuilder.setSplitHandler(new SplitHandler() {            /**             * 将记录字段值splitValue切割为多条记录,如果方法返回null,则继续将原记录写入目标库             * @param taskContext             * @param record             * @param fieldValue             * @return List<KeyMap<String, Object>> KeyMap是LinkedHashMap的子类,添加key字段,如果是往kafka推送数据,可以设置推送的key             */            @Override            public List<KeyMap> splitField(TaskContext taskContext,//调度任务上下文                                           Record record,//原始记录对象                                           Object fieldValue) {//待切割的字段值                //如果@message不是一个数组格式的json,那么就不要拆分原来的记录,直接返回null就可以了                String data =  String.valueOf(fieldValue);                if(!data.startsWith("["))                    return null;                //把@message字段进行切割为一个List<Map>对象                List<Map> datas = SimpleStringUtil.json2ListObject(data, Map.class);                List<KeyMap> splitDatas =  new ArrayList<>(datas.size());                for(int i = 0; i < datas.size(); i ++){                    Map map = datas.get(i);                    KeyMap keyMap = new KeyMap();                    keyMap.put("@message",map);//然后循环将map再放回新记录,作为新记录字段@message的值                    splitDatas.add(keyMap);                }                return splitDatas;            }        });
复制代码


上面的列子是把 @message 字段进行切割为一个 List 对象,然后循环将 map 再放回新记录,作为新记录字段 @message 的值,需要注意的是如果 @message 不是一个数组格式的 json,那么就不要拆分原来的记录,直接返回 null 就可以了:


String data =  String.valueOf(fieldValue);if(!data.startsWith("["))   return null;//保留原记录,不切割List<Map> datas = SimpleStringUtil.json2ListObject(data, Map.class);
复制代码


最后一个注意事项,如果我们在最终的输出字段中,需要将 @message 名称变成名称 message,那么只需加上以下代码即可:


//将 @message 名称映射转换为 message


​ importBuilder.addFieldMapping("@message","message");


完整的代码:


importBuilder.setSplitFieldName("@message");        importBuilder.setSplitHandler(new SplitHandler() {            /**             * 将记录字段值splitValue切割为多条记录,如果方法返回null,则继续将原记录写入目标库             * @param taskContext             * @param record             * @param fieldValue             * @return List<KeyMap<String, Object>> KeyMap是LinkedHashMap的子类,添加key字段,如果是往kafka推送数据,可以设置推送的key             */            @Override            public List<KeyMap> splitField(TaskContext taskContext,//调度任务上下文                                           Record record,//原始记录对象                                           Object fieldValue) {//待切割的字段值                //如果@message不是一个数组格式的json,那么就不要拆分原来的记录,直接返回null就可以了                String data =  String.valueOf(fieldValue);                if(!data.startsWith("["))                    return null;                //把@message字段进行切割为一个List<Map>对象                List<Map> datas = SimpleStringUtil.json2ListObject(data, Map.class);                List<KeyMap> splitDatas =  new ArrayList<>(datas.size());                for(int i = 0; i < datas.size(); i ++){                    Map map = datas.get(i);                    KeyMap keyMap = new KeyMap();                    keyMap.put("@message",map);//然后循环将map再放回新记录,作为新记录字段@message的值                    splitDatas.add(keyMap);                }                return splitDatas;            }        });        //将@message名称映射转换为message        importBuilder.addFieldMapping("@message","message");
复制代码


生成的正确记录如下:


{"uuid":"7af4eee7-61d7-4ab8-8678-117fd6f37e24","message":{"userId":"123457","userName":"李四3","yearMonth":"202104","readTime":"20210401","payTime":"20210501","waterNum":"100","waterType":"工业用水"},"@timestamp":"2021-10-12T02:45:06.419Z","@filemeta":{"hostName":"DESKTOP-U3V5C85","pointer":1354,"hostIp":"169.254.252.194","filePath":"D:/workspace/bbossesdemo/kafka2x-elasticsearch/data/waterinfo_20210811211501009.json","fileId":"D:/workspace/bbossesdemo/kafka2x-elasticsearch/data/waterinfo_20210811211501009.json"}}
复制代码

2.8.23 自定义处理器

通过自定义处理采集数据功能,可以自行将采集的数据按照自己的要求进行处理到目的地,支持数据来源包括:database,elasticsearch,kafka,mongodb,hbase,file,ftp 等,想把采集的数据保存到什么地方,有自己实现 CustomOutPut 接口处理即可,例如:


ImportBuilder importBuilder = new ImportBuilder();        importBuilder.setBatchSize(10)//设置批量入库的记录数                .setFetchSize(1000);//设置按批读取文件行数        /**         * es相关配置         */        ElasticsearchInputConfig elasticsearchInputConfig = new ElasticsearchInputConfig();        elasticsearchInputConfig                .setDslFile("dsl2ndSqlFile.xml")                .setDslName("scrollQuery")                .setScrollLiveTime("10m")//        .setSliceQuery(true)//        .setSliceSize(5)                .setQueryUrl("dbdemo/_search");        importBuilder.setInputConfig(elasticsearchInputConfig)                .setIncreamentEndOffset(5);
//自己处理数据 CustomOupputConfig customOupputConfig = new CustomOupputConfig(); customOupputConfig.setCustomOutPut(new CustomOutPut() { @Override public void handleData(TaskContext taskContext, List<CommonRecord> datas) {
//You can do any thing here for datas //单笔记录处理 RedisHelper redisHelper = null; RedisHelper redisHelper1 = null; try { redisHelper = RedisFactory.getRedisHelper(); redisHelper1 = RedisFactory.getRedisHelper("redis1");
for (CommonRecord record : datas) { Map<String, Object> data = record.getDatas(); String LOG_ID =String.valueOf(data.get("LOG_ID"));// logger.info(SimpleStringUtil.object2json(data)); String valuedata = SimpleStringUtil.object2json(data); logger.debug("LOG_ID:{}",LOG_ID);// logger.info(SimpleStringUtil.object2json(data)); redisHelper.hset("xingchenma", LOG_ID, valuedata); redisHelper.hset("xingchenma", LOG_ID, valuedata); } } finally { if(redisHelper != null) redisHelper.release(); if(redisHelper1 != null) redisHelper1.release(); } } }); importBuilder.setOutputConfig(customOupputConfig);
复制代码


自定义处理采集数据功能典型的应用场景就是对接大数据流处理,直接将采集的数据交给一些流处理框架,譬如与我们内部自己开发的大数据流处理框架对接,效果简直不要不要的,哈哈。


采集日志文件自定义处理案例

2.9 DB-ES 数据同步工具使用方法

上面介绍了数据库数据同步到数据库的各种用法,bboss 还提供了一个样板 demo 工程:db-elasticsearch-tool,用来将写好的同步代码打包发布成可以运行的二进制包上传到服务器运行,db-elasticsearch-tool提供了现成的运行指令和 jvm 配置文件。


工具详细的使用文档参考:DB-ES数据同步工具使用方法

2.10 作业参数配置

在使用db-elasticsearch-tool时,为了避免调试过程中不断打包发布数据同步工具,可以将需要调整的参数配置到启动配置文件 src\test\resources\application.properties 中,然后在代码中通过以下方法获取配置的参数:


#工具主程序mainclass=org.frameworkset.elasticsearch.imp.Dbdemo
# 参数配置# 在代码中获取方法:propertiesContainer.getBooleanSystemEnvProperty("dropIndice",false);//同时指定了默认值falsedropIndice=false
复制代码


在代码中获取参数 dropIndice 方法:


import org.frameworkset.spi.assemble.PropertiesUtil
复制代码


PropertiesContainer propertiesContainer = PropertiesUtil.getPropertiesContainer();boolean dropIndice = propertiesContainer.getBooleanSystemEnvProperty("dropIndice",false);//同时指定了默认值false
复制代码


另外可以在 src\test\resources\application.properties 配置控制作业执行的一些参数,例如工作线程数,等待队列数,批处理 size 等等:


queueSize=50workThreads=10batchSize=20
复制代码


在作业执行方法中获取并使用上述参数:


PropertiesContainer propertiesContainer = PropertiesUtil.getPropertiesContainer();int batchSize = propertiesContainer.getIntSystemEnvProperty("batchSize",10);//同时指定了默认值int queueSize = propertiesContainer.getIntSystemEnvProperty("queueSize",50);//同时指定了默认值int workThreads = propertiesContainer.getIntSystemEnvProperty("workThreads",10);//同时指定了默认值importBuilder.setBatchSize(batchSize);importBuilder.setQueue(queueSize);//设置批量导入线程池等待队列长度importBuilder.setThreadCount(workThreads);//设置批量导入线程池工作线程数量
复制代码

2.11 基于 xxjob 同步 DB-Elasticsearch 数据

bboss 结合 xxjob 分布式定时任务调度引擎,可以非常方便地实现强大的 shard 分片分布式同步数据库数据到 Elasticsearch 功能,比如从一个 10 亿的数据表中同步数据,拆分为 10 个任务分片节点执行,每个节点同步 1 个亿,速度会提升 10 倍左右;同时提供了同步作业的故障迁移容灾能力。


参考文档:


基于xxl-job数据同步作业调度

2.12 spring boot 中使用数据同步功能

可以在 spring boot 中使用数据同步功能,这里以 db-elasticsearch 定时增量数据同步为例进行说明,其他数据源方法类似。


参考文档:https://esdoc.bbossgroups.com/#/usedatatran-in-spring-boot

2.13 数据导入不完整原因分析及处理

如果在任务执行完毕后,发现 es 中的数据与数据库源表的数据不匹配,可能的原因如下:

2.13.1.并行执行的过程中存在失败的任务

并行执行的过程中存在失败的任务(比如服务端超时),这种情况通过 setExportResultHandler 设置的 exception 监听方法进行定位分析


参考章节【设置任务执行结果回调处理函数


 public void exception(TaskCommand<String, String> taskCommand, Exception exception) {//任务执行抛出异常,失败处理方法,特殊的异常可以调用taskCommand的execute方法重试     if(need retry)         taskCommand.execute();}
复制代码


解决办法:


a) 优化 elasticsearch 服务器配置(加节点,加内存和 cpu 等运算资源,调优网络性能等)


b) 调整同步程序导入线程数、批处理 batchSize 参数,降低并行度。


importBuilder.setBatchSize(10000);//每次bulk批处理的记录条数importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行importBuilder.setQueue(100);//设置批量导入线程池等待队列长度importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
复制代码


c) 对于 read 或者等待超时的异常,亦可以调整配置文件 src\test\resources\application.properties 中的 http timeout 时间参数


http.timeoutConnection = 50000


http.timeoutSocket = 50000

2.13.2 存在 es 的 bulk 拒绝记录或者数据内容不合规

任务执行完毕,但是存在 es 的 bulk 拒绝记录或者数据内容不合规的情况,这种情况就通过 setExportResultHandler 设置的 error 监听方法进行定位分析


参考章节【设置任务执行结果回调处理函数


bulk 拒绝记录解决办法:


a) 优化 elasticsearch 服务器配置(加节点,加内存和 cpu 等运算资源,调优网络性能等)


调整 elasticsearch 的相关线程和队列:调优 elasticsearch 配置参数


thread_pool.bulk.queue_size: 1000 es 线程等待队列长度


thread_pool.bulk.size: 10 线程数量,与 cpu 的核数对应


b) 调整同步程序导入线程数、批处理 batchSize 参数,降低并行度。


数据内容不合规解决办法:拿到执行的原始批量数据,分析错误信息对应的数据记录,进行修改,然后重新导入失败的记录即可


@Override         public void error(TaskCommand<String,String> taskCommand, String result) {            //任务执行完毕,但是结果中包含错误信息            //具体怎么处理失败数据可以自行决定,下面的示例显示重新导入失败数据的逻辑:            // 从result中分析出导入失败的记录,然后重新构建data,设置到taskCommand中,重新导入,            // 支持的导入次数由getMaxRetry方法返回的数字决定              String datas = taskCommand.getDatas();//拿到执行的原始批量数据,分析错误信息对应的数据记录,进行修改,然后重新导入失败的记录即可            // String failDatas = ...;            //taskCommand.setDatas(failDatas);            //taskCommand.execute();           //          System.out.println(result);//打印成功结果         }
复制代码

2.13.3 elasticsearch 或者 mongodb 写入数据延迟性

从 elasticsearch、mongodb 这种存在写入数据的延迟性的数据库导出数据时,不设置截止时间戳偏移量时会存在遗漏数据的情况,解决方法参考文档:


时间戳增量导出截止时间偏移量配置

2.14 跨库跨表数据同步

在同步数据库中数据到 elasticsearch 时,会存在支持跨多个数据库跨多张表同步的情况,bboss 通过以下方式进行处理。


首先在 application.properties 文件中配置三个 db 数据源:db1,db2,db3


## 在数据导入过程可能需要使用的其他数据名称,需要在配置文件中定义相关名称的db配置thirdDatasources = db1,db2,db3
db1.db.user = rootdb1.db.password = 123456db1.db.driver = com.mysql.jdbc.Driver##db.url = jdbc:mysql://192.168.137.1:3306/bboss?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=falsedb1.db.url = jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=falsedb1.db.usePool = truedb1.db.validateSQL = select 1##db.jdbcFetchSize = 10000db1.db.jdbcFetchSize = -2147483648db1.db.showsql = true##db1.db.dbtype = mysql -2147483648##db1.db.dbAdaptor = org.frameworkset.elasticsearch.imp.TestMysqlAdaptor
db2.db.user = rootdb2.db.password = 123456db2.db.driver = com.mysql.jdbc.Driver##db.url = jdbc:mysql://192.168.137.1:3306/bboss?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=falsedb2.db.url = jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=falsedb2.db.usePool = truedb2.db.validateSQL = select 1##db.jdbcFetchSize = 10000db2.db.jdbcFetchSize = -2147483648db2.db.showsql = true##db2.db.dbtype = mysql -2147483648##db2.db.dbAdaptor = org.frameworkset.elasticsearch.imp.TestMysqlAdaptor
db3.db.user = rootdb3.db.password = 123456db3.db.driver = com.mysql.jdbc.Driver##db.url = jdbc:mysql://192.168.137.1:3306/bboss?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=falsedb3.db.url = jdbc:mysql://192.168.137.1:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=falsedb3.db.usePool = truedb3.db.validateSQL = select 1##db.jdbcFetchSize = 10000db3.db.jdbcFetchSize = -2147483648db3.db.showsql = true##db3.db.dbtype = mysql -2147483648##db3.db.dbAdaptor = org.frameworkset.elasticsearch.imp.TestMysqlAdaptor
复制代码


定义好三个数据源后,下面看看同步的代码


//设置同步数据源db1,对应主表数据库      importBuilder.setDbName("db1");
//指定导入数据的sql语句,必填项,可以设置自己的提取逻辑, // 设置增量变量log_id,增量变量名称#[log_id]可以多次出现在sql语句的不同位置中,例如: // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id] // log_id和数据库对应的字段一致,就不需要设置setLastValueColumn信息, // 但是需要设置setLastValueType告诉工具增量字段的类型 importBuilder.setSql("select * from td_cms_document ");
/** * 重新设置es数据结构 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception { //可以根据条件定义是否丢弃当前记录 //context.setDrop(true);return;// if(s.incrementAndGet() % 2 == 0) {// context.setDrop(true);// return;// } //空值处理,判断字段content的值是否为空 if(context.getValue("content") == null){ context.addFieldValue("content","");//将content设置为"" } context.addFieldValue("content","");//将content设置为"" CustomObject customObject = new CustomObject(); customObject.setAuthor((String)context.getValue("author")); customObject.setTitle((String)context.getValue("title")); customObject.setSubtitle((String)context.getValue("subtitle"));
customObject.setIds(new int[]{1,2,3}); context.addFieldValue("author",customObject);// org.frameworkset.tran.config.ClientOptions clientOptions = new org.frameworkset.tran.config.ClientOptions();// clientOptions.setRouting("1");// context.setClientOptions(clientOptions); long testtimestamp = context.getLongValue("testtimestamp");//将long类型的时间戳转换为Date类型 context.addFieldValue("testtimestamp",new Date(testtimestamp));//将long类型的时间戳转换为Date类型 /** Date create_time = context.getDateValue("create_time"); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); context.addFieldValue("createTime",simpleDateFormat.format(create_time)); context.addIgnoreFieldMapping("create_time"); */// context.addIgnoreFieldMapping("title"); //上述三个属性已经放置到docInfo中,如果无需再放置到索引文档中,可以忽略掉这些属性// context.addIgnoreFieldMapping("author");
//修改字段名称title为新名称newTitle,并且修改字段的值 context.newName2ndData("title","newTitle",(String)context.getValue("title")+" append new Value"); context.addIgnoreFieldMapping("subtitle"); /** * 获取ip对应的运营商和区域信息 */ IpInfo ipInfo = context.getIpInfo("remoteAddr"); context.addFieldValue("ipInfo",ipInfo); context.addFieldValue("collectTime",new Date()); //关联查询数据,单值查询,指定要查询的数据库为数据源db2 Map headdata = SQLExecutor.queryObjectWithDBName(Map.class,"db2", "select * from head where billid = ? and othercondition= ?", context.getIntegerValue("billid"),"otherconditionvalue");//多个条件用逗号分隔追加 //将headdata中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定 context.addFieldValue("headdata",headdata); //关联查询数据,多值查询,指定要查询的数据库为数据源db3 List<Map> facedatas = SQLExecutor.queryListWithDBName(Map.class,"db3", "select * from facedata where billid = ?", context.getIntegerValue("billid")); //将facedatas中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定 context.addFieldValue("facedatas",facedatas); } });
复制代码


关键点说明:


1.首先需要指定主表对应的数据源


importBuilder.setDbName("db1");


2.然后在 DataRefactor 中跨库检索其他关联表的的数据封装到对象中


 //关联查询数据,单值查询,指定要查询的数据库为数据源db2             Map headdata = SQLExecutor.queryObjectWithDBName(Map.class,"db2",             "select * from head where billid = ? and othercondition= ?",             context.getIntegerValue("billid"),"otherconditionvalue");//多个条件用逗号分隔追加             //将headdata中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定             context.addFieldValue("headdata",headdata);             //关联查询数据,多值查询,指定要查询的数据库为数据源db3             List<Map> facedatas = SQLExecutor.queryListWithDBName(Map.class,"db3",             "select * from facedata where billid = ?",             context.getIntegerValue("billid"));             //将facedatas中的数据,调用addFieldValue方法将数据加入当前es文档,具体如何构建文档数据结构根据需求定             context.addFieldValue("facedatas",facedatas);
复制代码

2.15 自定义启动 db 数据源案例

如果在 application.properties 中配置了数据库数据源连接池 visualops:


db.name = visualopsdb.user = rootdb.password = 123456db.driver = com.mysql.jdbc.Driverdb.url = jdbc:mysql://100.13.11.5:3306/visualops?useUnicode=true&characterEncoding=utf-8&useSSL=falsedb.validateSQL = select 1db.initialSize = 5db.minimumSize = 5db.maximumSize = 5db.showsql = true# 控制map中的列名采用小写,默认为大写db.columnLableUpperCase = false
复制代码


那么我们可以通过代码来加载并启动对应的连接池


PropertiesContainer propertiesContainer = new PropertiesContainer();            propertiesContainer.addConfigPropertiesFile("application.properties");        String dbName  = propertiesContainer.getProperty("db.name");        String dbUser  = propertiesContainer.getProperty("db.user");        String dbPassword  = propertiesContainer.getProperty("db.password");        String dbDriver  = propertiesContainer.getProperty("db.driver");        String dbUrl  = propertiesContainer.getProperty("db.url");
String showsql = propertiesContainer.getProperty("db.showsql"); String validateSQL = propertiesContainer.getProperty("db.validateSQL"); String dbInfoEncryptClass = propertiesContainer.getProperty("db.dbInfoEncryptClass");
DBConf tempConf = new DBConf(); tempConf.setPoolname(dbName); tempConf.setDriver(dbDriver); tempConf.setJdbcurl(dbUrl); tempConf.setUsername(dbUser); tempConf.setPassword(dbPassword); tempConf.setValidationQuery(validateSQL); tempConf.setShowsql(showsql != null && showsql.equals("true")); //tempConf.setTxIsolationLevel("READ_COMMITTED"); tempConf.setJndiName("jndi-"+dbName); tempConf.setDbInfoEncryptClass(dbInfoEncryptClass); String initialConnections = propertiesContainer.getProperty("db.initialSize"); int _initialConnections = 10; if(initialConnections != null && !initialConnections.equals("")){ _initialConnections = Integer.parseInt(initialConnections); } String minimumSize = propertiesContainer.getProperty("db.minimumSize"); int _minimumSize = 10; if(minimumSize != null && !minimumSize.equals("")){ _minimumSize = Integer.parseInt(minimumSize); } String maximumSize = propertiesContainer.getProperty("db.maximumSize"); int _maximumSize = 20; if(maximumSize != null && !maximumSize.equals("")){ _maximumSize = Integer.parseInt(maximumSize); } tempConf.setInitialConnections(_initialConnections); tempConf.setMinimumSize(_minimumSize); tempConf.setMaximumSize(_maximumSize); tempConf.setUsepool(true); tempConf.setExternal(false); tempConf.setEncryptdbinfo(false); if(showsql != null && showsql.equalsIgnoreCase("true")) tempConf.setShowsql(true); else{ tempConf.setShowsql(false); }# 控制map中的列名采用小写,默认为大写 temConf.setColumnLableUpperCase(dbConfig.isColumnLableUpperCase()); //启动数据源 SQLManager.startPool(tempConf);
复制代码


使用数据源 visualops 访问数据库示例代码:


List<Map> facedatas = SQLExecutor.queryListWithDBName(Map.class,"visualops",             "select * from facedata where billid = ?",             0);
复制代码


更多的持久层使用文档访问:


https://doc.bbossgroups.com/#/persistent/tutorial

3 Elasticsearch-db 数据同步使用方法

完整的示例工程:


https://github.com/bbossgroups/db-elasticsearch-tool


工程基于 gradle 管理,可以参考文档配置 gradle 环境:


https://esdoc.bbossgroups.com/#/bboss-build


案例清单


https://esdoc.bbossgroups.com/#/bboss-datasyn-demo?id=_31-elasticsearch%e5%af%bc%e5%85%a5database%e6%a1%88%e4%be%8b

4 Mongodb-Elasticsearch 数据同步使用方法

Mongodb-Elasticsearch 数据同步案例工程


https://github.com/bbossgroups/mongodb-elasticsearch


工程基于 gradle 管理,可以参考文档配置 gradle 环境:


https://esdoc.bbossgroups.com/#/bboss-build


mongodb-elasticseach 数据同步使用方法和 DB-Elasticsearch、Elasticsearch-DB 数据同步的使用方法类似,支持全量、增量定时同步功能, 内置 jdk timer 同步器,支持 quartz、xxl-job 任务调度引擎 ,这里就不具体举例说明,大家可以下载 demo 研究即可,mongodb-elasticseach 数据同步基本和 DB-Elasticsearch 同步的参数配置差不多,参考文档


https://esdoc.bbossgroups.com/#/mongodb-elasticsearch

5 Database-Database 数据同步使用方法

https://github.com/bbossgroups/db-elasticsearch-tool/blob/master/src/main/java/org/frameworkset/elasticsearch/imp/Db2DBdemo.java


spring boot db-db 同步案例:


https://github.com/bbossgroups/db-db-job

6 Kafka1x-Elasticsearch 数据同步使用方法(不推荐)

https://github.com/bbossgroups/kafka1x-elasticsearch


适用于 old kafka client 包,不推荐使用

7 Kafka2x-Elasticsearch 数据同步使用方法(推荐)

https://github.com/bbossgroups/kafka2x-elasticsearch


适用于新版本 kafka client,推荐使用

8 Elasticsearch-Elasticsearch 数据同步使用方法

https://github.com/bbossgroups/elasticsearch-elasticsearch

9 HBase-Elasticsearch 数据同步使用方法

https://github.com/bbossgroups/hbase-elasticsearch

10 数据同步调优

数据同步是一个非常耗资源(内存、cpu、io)的事情,所以如何充分利用系统资源,确保高效的数据同步作业长时间稳定运行,同时又不让同步服务器、Elasticsearch/数据库负荷过高,是一件很有挑战意义的事情,这里结合 bboss 的实践给出一些建议:

10.1 内存调优

内存溢出很大一个原因是 jvm 配置少了,这个处理非常简单,修改 jvm.option 文件,适当调大内存即可,设置作业运行需要的 jvm 内存,按照比例调整 Xmx 和 MaxNewSize 参数:


# Xms represents the initial size of total heap space# Xmx represents the maximum size of total heap space
-Xms1g-Xmx1g-XX:NewSize=512m-XX:MaxNewSize=512m# explicitly set the stack size-Xss1m
复制代码


Xms 和 Xmx 保持一样,NewSize 和 MaxNewSize 保持一样,Xmx 和 MaxNewSize 大小保持的比例可以为 3:1 或者 2:1


影响内存使用情况的其他关键参数:


  • 并发线程数(threadCount):每个线程都会把正在处理的数据放到内存中

  • 线程缓冲队列数(queue):工作线程全忙的情况下,后续的数据处理请求会放入

  • batchSize(批量写入记录数):决定了每批记录的大小,假如并发线程数和线程缓冲队列数全满,那么占用内存的换算方法:



  • jdbcFetchSize/fetchSize:从数据源按批拉取记录数,拉取过来的数据会临时放入本地内存中


这些参数设置得越大,占用的内存越大,处理的速度就越快,典型的空间换时间的场景,所以需要根据同步服务器的主机内存来进行合理配置,避免由于资源不足出现 jvm 内存溢出的问题,影响同步的稳定性。

10.2 采用分布式作业调度引擎

需要同步的数据量很大,单机的处理能力有限,可以基于分布式作业调度引擎来实现数据分布式分片数据同步处理,参考文档:


https://esdoc.bbossgroups.com/#/db-es-tool?id=_26-%e5%9f%ba%e4%ba%8exxjob-%e5%90%8c%e6%ad%a5db-elasticsearch%e6%95%b0%e6%8d%ae

11 数据同步模式控制

11.1 全量/增量导入

根据实际需求,有些场景需要全量导入数据,有些场景下需要增量导入数据,以 session 数据同步案例作业来讲解具体的控制方法


  • 增量同步时加上下面的代码


        importBuilder.setLastValueColumn("lastAccessedTime");//手动指定数字增量查询字段//setFromfirst(false),如果作业停了,作业重启后从上次停止的位置开始采集数据,//setFromfirst(true) 如果作业停了,作业重启后,重新开始位置开始采集数据        importBuilder.setFromFirst(false);
importBuilder.setLastValueStorePath("mongodb_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样 //设置增量查询的起始值lastvalue try { Date date = format.parse("2000-01-01"); importBuilder.setLastValue(date); } catch (Exception e){ e.printStackTrace(); }
复制代码


  • 全量同步时,去掉或者注释掉上面的代码


        /**        importBuilder.setLastValueColumn("lastAccessedTime");//手动指定数字增量查询字段//setFromfirst(false),如果作业停了,作业重启后从上次停止的位置开始采集数据,//setFromfirst(true) 如果作业停了,作业重启后,重新开始位置开始采集数据        importBuilder.setFromFirst(false);        importBuilder.setLastValueStorePath("mongodb_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样        //设置增量查询的起始值lastvalue        try {            Date date = format.parse("2000-01-01");            importBuilder.setLastValue(date);        }        catch (Exception e){            e.printStackTrace();        }*/
复制代码

11.2 一次性执行和周期定时执行

根据实际需求,有些场景作业启动后只需执行一次,有些场景需要周期性定时执行,以 session 数据同步案例作业来讲解具体的控制方法


  • 定时执行

  • 支持 jdk timer 和 quartz 以及 xxl-job 三种定时执行机制,以 jdk timer 为例,加上以下代码即可


        //定时任务配置,        importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明               //.setScheduleDate(date) //指定任务开始执行时间:日期                .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行                .setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次
复制代码


  • 一次性执行一次性执行只需要将上面的代码 setFixedRate 和 setPeriod 去掉即可


        /**           //定时任务配置,        importBuilder               //.setScheduleDate(date) //指定任务开始执行时间:日期                .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行                        */        
复制代码


然后执行完毕后调用 destroy 方法,例如:


/**         * 执行数据库表数据导入es操作         */        DataStream dataStream = importBuilder.builder();        dataStream.execute();        dataStream.destroy();//执行完毕后释放资源
复制代码

11.3 串行执行和并行执行

根据实际需求,有些场景作业采用串行模式执行,有些场景需要并行执行,以 session 数据同步案例作业来讲解具体的控制方法


  • 并行执行

  • 并行执行,加上以下代码即可


        importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行        importBuilder.setQueue(10);//设置批量导入线程池等待队列长度        importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量        importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行        importBuilder.setAsyn(false);//是否同步等待每批次任务执行完成后再返回调度程序,true 不等待所有导入作业任务结束,方法快速返回;false(默认值) 等待所有导入作业任务结束,所有作业结束后方法才返回
复制代码


  • 串行执行串行执行只需要将上面的代码注释即可


        /**           importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行        importBuilder.setQueue(10);//设置批量导入线程池等待队列长度        importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量        importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行        importBuilder.setAsyn(false);//是否同步等待每批次任务执行完成后再返回调度程序,true 不等待所有导入作业任务结束,方法快速返回;false(默认值) 等待所有导入作业任务结束,所有作业结束后方法才返回        */    
复制代码

11.4 任务执行开始时间和结束时间设置

一次性导入和周期性导入,都可以设置任务导出的开始时间、延时执行时间和任务结束时间(只对 jdk timer 有效)


指定任务开始时间或者延迟时间


    importBuilder.setScheduleDate(TimeUtil.addDateMinitues(new Date(),1)); //指定任务开始执行时间:日期,1分钟后开始//          .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行
复制代码


同时指定任务开始时间和结束时间


//定时任务配置,      importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明                .setScheduleDate(TimeUtil.addDateMinitues(new Date(),1)) //指定任务开始执行时间:日期,1分钟后开始//          .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行            .setScheduleEndDate(TimeUtil.addDateMinitues(new Date(),3))//3分钟后自动结束任务
.setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次
复制代码

12 数据导出到文件并上传 SFTP/FTP

支持将 elasticsearch 和关系数据库中的数据导出到文件并上传到 sftp 和 ftp 服务器,支持自定义数据记录格式,使用案例和参考文档:


https://esdoc.bbossgroups.com/#/elasticsearch-sftp

13 数据导出发送到 Kafka

支持将 elasticsearch 和关系数据库中的数据导出并发送到 kafka 服务器,支持自定义数据记录格式,使用案例


  1. elasticsearch 数据导出发送到 kafka 模块,使用案例:https://github.com/bbossgroups/kafka2x-elasticsearch/blob/master/src/main/java/org/frameworkset/elasticsearch/imp/ES2KafkaDemo.java

  2. 关系数据库数据导出发送到 kafka 模块,使用案例:https://github.com/bbossgroups/kafka2x-elasticsearch/blob/master/src/main/java/org/frameworkset/elasticsearch/imp/DB2KafkaDemo.java

14 日志文件数据采集插件使用案例

支持全量和增量采集两种模式,实时采集本地/ftp 日志文件数据到 kafka/elasticsearch/database/


日志文件采集插件使用文档:


https://esdoc.bbossgroups.com/#/filelog-guide


使用案例:


  1. 采集日志数据并写入数据库

  2. 采集日志数据并写入Elasticsearch

  3. 采集日志数据并发送到Kafka

15 作业调度控制

参考文档:作业调度控制

16 开发交流

完整的数据导入 demo 工程


github:db-elasticsearch-tool


elasticsearch 交流 QQ 群:21220580,166471282


elasticsearch 微信公众号:



用户头像

大河

关注

还未添加个人签名 2022.08.15 加入

还未添加个人简介

评论

发布
暂无评论
bboss数据同步ETL工具介绍_elasticsearch_大河_InfoQ写作社区