1. 插件简介
Mysql binlog 插件通过配置对应的 mysql master ip 和端口、数据库账号和口令、监听的数据库表以及 binlog 文件路径等信息,实时采集 mysql 增删改数据,支持以下三种数据采集模式:
模式 1 直接读取 binlog 文件,采集文件中的增删改数据
模式 2 监听 mysql master slave ip 和端口,作业重启后从 binlog 最新位置采集增删改数据
模式 3 监听 mysql master slave ip 和端口,启用故障容灾配置,作业重启后从上次采集结束的位置开始采集数据
模式 1 适用一次性离线数据采集场景,模式 2 和模式 3 适用于实时采集场景。
2. 使用配置
2.1 模式 1 案例
模式 1 直接读取 binlog 文件,采集文件中的增删改数据(适用一次性离线数据采集场景)
ImportBuilder importBuilder = new ImportBuilder();
importBuilder.setBatchSize(1000);//设置批量入Elasticsearch库的记录数
//binlog插件配置开始
MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
mySQLBinlogConfig.setHost("192.168.137.1");
mySQLBinlogConfig.setPort(3306);
mySQLBinlogConfig.setDbUser("root");
mySQLBinlogConfig.setDbPassword("123456");
//如果直接监听文件则设置binlog文件路径,否则不需要配置文件路径
mySQLBinlogConfig.setFileNames("F:\\6_environment\\mysql\\binlog.000107,F:\\6_environment\\mysql\\binlog.000127");
mySQLBinlogConfig.setTables("cityperson");//监控增量表名称
mySQLBinlogConfig.setDatabase("bboss");//监控增量表名称
//binlog插件配置结束
importBuilder.setInputConfig(mySQLBinlogConfig);
复制代码
2.2 模式 2 案例
模式 2 监听 mysql master slave ip 和端口,作业重启从 binlog 最新位置采集删改数据
//binlog插件配置开始
MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
mySQLBinlogConfig.setHost("192.168.137.1");
mySQLBinlogConfig.setPort(3306);
mySQLBinlogConfig.setDbUser("root");
mySQLBinlogConfig.setDbPassword("123456");
mySQLBinlogConfig.setTables("cityperson");//监控增量表名称
mySQLBinlogConfig.setDatabase("bboss");//监控增量表名称
mySQLBinlogConfig.setServerId(65536L);//模拟slave节点ID
//binlog插件配置结束
importBuilder.setInputConfig(mySQLBinlogConfig);
复制代码
2.3 模式 3 案例
监听 mysql master slave ip 和端口,启用故障容灾配置,每次重启作业从上次采集结束的位置开始采集数据
MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
mySQLBinlogConfig.setHost("192.168.137.1");
mySQLBinlogConfig.setPort(3306);
mySQLBinlogConfig.setDbUser("root");
mySQLBinlogConfig.setDbPassword("123456");
mySQLBinlogConfig.setServerId(100000L);
mySQLBinlogConfig.setTables("cityperson,batchtest");//
mySQLBinlogConfig.setDatabase("bboss");
mySQLBinlogConfig.setEnableIncrement(true);//启用模式3
importBuilder.setInputConfig(mySQLBinlogConfig);
importBuilder.setPrintTaskLog(true);
//启用模式3 故障容灾机制配置
// importBuilder.setStatusDbname("testStatus");//指定增量状态数据源名称
importBuilder.setLastValueStorePath("binlog2db_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样
importBuilder.setLastValueStoreTableName("binlog");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab
复制代码
更多实际配置案例,可以访问原文链接地址获取:
https://esdoc.bbossgroups.com/#/mysql-binlog
3. 视频教程
插件视频教程
https://www.bilibili.com/video/BV1ko4y1M7My/
评论