从 Mysql 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 Mysql 同步到 Easysearch 。
方案前提
- Mysql 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 Mysql 表数据形成一对一映射关系,支持修改。 
- Mysql 表记录必须有时间字段,以支持增量同步。 
如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。
方案演示
版本信息
Mysql: 5.7
Logstash: 7.10.2
Easysearch: 1.5.0
MySQL 设置
创建演示用的表。
 CREATE DATABASE es_db;USE es_db;DROP TABLE IF EXISTS es_table;CREATE TABLE es_table (id BIGINT(20) UNSIGNED NOT NULL,PRIMARY KEY (id),UNIQUE KEY unique_id (id),client_name VARCHAR(32) NOT NULL,modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP);
   复制代码
 
说明
- id 字段: 主键、唯一键,将作为 Easysearch 索引中的 doc id 字段。 
- modification_time 字段: 表记录的插入和修改都会记录在此。 
- client_name: 代表用户数据。 
- insertion_time: 可省略,用来记录数据插入到 Mysql 数据的时间。 
插入数据
 INSERT INTO es_table (id, client_name) VALUES (1, 'test 1');INSERT INTO es_table (id, client_name) VALUES (2, 'test 2');INSERT INTO es_table (id, client_name) VALUES (3, 'test 3');
   复制代码
 Logstash
 input {  jdbc {    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"    jdbc_user => "root"    jdbc_password => "password"    jdbc_paging_enabled => true    tracking_column => "unix_ts_in_secs"    use_column_value => true    tracking_column_type => "numeric"    last_run_metadata_path => "./.mysql-es_table-sql_last_value.yml"    schedule => "*/5 * * * * *"    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"  }  jdbc {    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"    jdbc_user => "root"    jdbc_password => "password"    schedule => "*/5 * * * * *"    statement => "SELECT count(*) AS count,'es_table' AS table_name  from es_table"  }
}filter {if ![table_name] {        mutate {           copy => { "id" => "[@metadata][_id]"}           remove_field => ["@version", "unix_ts_in_secs","@timestamp"]           add_field => { "[@metadata][target_index]" => "mysql_es_table" } }      } else {        mutate {           add_field => { "[@metadata][target_index]" => "table_counts" }           remove_field => ["@version"]               }        uuid {           target  => "[@metadata][_id]"           overwrite => true             }      }}output {#  stdout { codec =>  rubydebug { metadata => true } }  elasticsearch {      hosts => ["https://localhost:9200"]      user => "admin"      password => "f0c6fc61fe5f7b084c00"      ssl_certificate_verification => "false"      index => "%{[@metadata][target_index]}"      manage_template => "false"      document_id => "%{[@metadata][_id]}"  }}
   复制代码
 
启动 logstash
 ./bin/logstash -f sync_es_table.conf
   复制代码
 
查看同步结果, 3 条数据都已同步到索引。
Mysql 数据库新增记录
 INSERT INTO es_table (id, client_name) VALUES (4, 'test 4');
   复制代码
 
Easysearch 确认新增
Mysql 数据库修改记录
 UPDATE es_table SET client_name = 'test 0001' WHERE id=1;
   复制代码
 
Easysearch 确认修改
删除数据
Logstash 无法直接删除操作到 ES ,有两个方案:
- 在表中增加 is_deleted 字段,实现软删除,可达到同步的目的。查询过滤掉 is_deleted : true 的记录,后续通过脚本等方式定期清理 is_deleted : true 的数据。 
- 执行删除操作的程序,删除完 Mysql 中的记录后,继续删除 Easysearch 中的记录。 
同步监控
数据已经在 ES 中了,我们可利用 INFINI Console 的数据看板来监控数据是否同步,展示表记录数、索引记录数及其变化。
评论