写点什么

mysql 实时同步到 es

作者:秃头小帅oi
  • 2025-02-20
    福建
  • 本文字数:2216 字

    阅读完需:约 7 分钟

测试了多个方案同步,最终选择 oceanu 产品,底层基于 Flink cdc1、实时性能够保证,binlog 量很大时也不产生延迟 2、配置 SQL 即可完成,操作上简单

下面示例 mysql 的 100 张分表实时同步到 es,优化备注等文本字段的 like 查询

创建 SQL 作业

CREATE TABLE from_mysql (  id int,  cid int NOT NULL,  gid bigint NOT NULL,  content varchar,  create_time TIMESTAMP(3)  ,  PRIMARY KEY (id) NOT ENFORCED) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'mysql-ip',  'port' = '3306',  'username' = 'mysqluser',  'password' = 'mysqlpwd',  'database-name' = 'mysqldb',  'debezium.snapshot.locking.mode' = 'none',  'table-name' = 'tb_test[0-9]?[0-9]',  'server-id' = '100-110',  'server-time-zone' = 'Asia/Shanghai',  'debezium.skipped.operations' = 'd',  'debezium.snapshot.mode' = 'schema_only',  'debezium.min.row.count.to.stream.results' = '50000');
CREATE TABLE to_es ( id string, tableid int, tablename string, cid int NOT NULL, gid string NOT NULL, content string, create_time string, PRIMARY KEY (id,companyId) NOT ENFORCED) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://ip:9200', 'connector.index' = 'myindex', 'connector.document-type' = '_doc', 'connector.username' = 'elastic', 'connector.password' = 'password123', 'update-mode' = 'upsert', 'connector.key-delimiter' = '$', 'connector.key-null-literal' = 'n/a', 'connector.failure-handler' = 'retry-rejected', 'connector.flush-on-checkpoint' = 'true', 'connector.bulk-flush.max-actions' = '10000', 'connector.bulk-flush.max-size' = '2 mb', 'connector.bulk-flush.interval' = '2000', 'connector.connection-max-retry-timeout' = '300', 'format.type' = 'json');
INSERT INTO to_esSELECTconcat(CAST(id as string),'-',CAST(mod(cid,100) AS VARCHAR)) as id, id tableid,tablename,cid,gid,content,DATE_FORMAT(create_time, 'yyyy-MM-dd HH:mm:ss') as create_timefrom from_mysql
复制代码

这里主要注意字段类型


scan.startup.mode:"initial"(默认,同步历史数据),"latest-offset" 同步增量数据


最后 insert 可以加 where,只同步需要的行数据

es 配置

配置好 mapping、setting 和自己的分词器

使用自字义分词是因为字段中所有涉及的标点符号、空格等都可以来检索

PUT myindex-20230314/{   "mappings": {    "properties": {      "id":{        "type": "text"      },      "tableid":{        "type": "long"      },      "cid":{        "type": "long"      },      "gid":{        "type": "text",    "analyzer": "my_analyzer"      },      "content":{        "type": "text",    "analyzer": "my_analyzer"      },            "create_time" : {          "type" : "keyword"        }    }  },  "settings": {    "index":{      "number_of_shards": "10",      "number_of_replicas": "1",      "refresh_interval" : "1s",      "translog": {        "sync_interval": "30s",        "durability": "async"      },      "codec": "best_compression",       "analysis": {      "analyzer": {        "my_analyzer": {          "tokenizer": "my_tokenizer",          "filter": [            "lowercase"          ]        }      },      "tokenizer": {        "my_tokenizer": {          "type": "ngram",          "min_gram": 1,          "max_gram": 2,          "token_chars": [            "letter",            "digit","whitespace","punctuation","symbol"          ]        }      }    }    }  }}
复制代码

使用别名,方便后续的维护

 POST /_aliases{    "actions": [        { "add":    { "index": "myindex-20230314", "alias": "myindex" }}    ]}
复制代码

之前测试的

  • canal 单进程延迟越来越大,单独配置历史数据同步

  • go-mysql-elasticsearch 经常报错重新同步

  • logstash 同步 100 张分表不知道怎么配置

  • oceanus 是收费的对于运维人员不足的情况,可以参考,有精力的可以考虑 flink。

前沿拓展

在科技飞速发展的当下,程序员若想在职场中脱颖而出,就必须不断发掘新技术,增强自身竞争力。新技术不仅能拓宽职业道路,还能提升解决问题的能力。

以低代码开发为例,它打破了传统开发模式的束缚,通过可视化界面和少量代码,就能快速搭建应用程序。这种高效的开发方式正被越来越多的企业所采用。而 JNPF 快速开发平台,拥有丰富的组件库和强大的功能,能帮助程序员轻松应对各种复杂的业务需求。

应用地址:https://www.jnpfsoft.com

这是一个基于 Flowable 引擎(支持 java、.NET),已支持 MySQL、SqlServer、Oracle、PostgreSQL、DM(达梦)、 KingbaseES(人大金仓)6 个数据库,支持私有化部署,前后端封装了上千个常用类,方便扩展,框架集成了表单、报表、图表、大屏等各种常用的 Demo 方便直接使用。

所以,程序员们不妨深入了解 JNPF 快速开发平台,掌握低代码开发技术,为自己的职业发展增添强大助力 。

用户头像

摸个鱼,顺便发点有用的东西 2023-06-19 加入

互联网某厂人(重生版)

评论

发布
暂无评论
mysql实时同步到es_秃头小帅oi_InfoQ写作社区