写点什么

1K 字详解 canal-1.1.5 实时同步 MySQL 数据到 Elasticsearch

  • 2022 年 8 月 03 日
  • 本文字数:1491 字

    阅读完需:约 5 分钟

1K字详解canal-1.1.5实时同步MySQL数据到Elasticsearch

一、环境准备


1、jdk 8+


2、mysql 5.7+


3、Elasticsearch 7+


4、kibana 7+


5、canal.adapter 1.1.5


二、部署


一、创建数据库 CanalDb 和表 UserInfo


SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for UserInfo-- ----------------------------DROP TABLE IF EXISTS `UserInfo`;CREATE TABLE `UserInfo`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,  `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,  `age` int(11) DEFAULT NULL,  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;
复制代码


二、kibana 创建索引


PUT canal_product {  "mappings": {    "properties": {      "user_name": {        "type": "text"      },      "phone": {        "type": "text"      },      "age": {        "type": "integer"      }    }  }}
复制代码



三、下载安装 canal.adapter


github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5


额外需要下载 v1.1.5-alpha-2 快照版本的 canal.adapter-1.1.5.tar.gz(release1.1.5 版本的 jar 包有 bug)


分别解压缩后,将 v1.1.5-alpha-2 解压缩文件夹下 plugin 文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉 release 版本的 plugin 文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,并重命名,再将该 jar 赋予权限 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar



1、解压并修改配置文件 conf/application.yml


只需要修改特定的几处即可,关于各节点说明可参考官方说明:https://help.aliyun.com/document_detail/135297.html

srcDataSources:    defaultDS:      url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false      username: canal      password: canal
复制代码


- name: es7        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode         properties:          mode: rest #transport  or rest           security.auth: es:22222 #  only used for rest mode          cluster.name: elasticsearch  # es集群节点名称
复制代码


2、启动服务

启动服务./bin/startup.sh
复制代码

3、查看日志是否启动成功


cat logs/adapter/adapter.log
复制代码

如图所示


4、实时同步


向数据库中插入一条数据


INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('张三', '10086', 99);
复制代码


查看日志


kibana 查看索引数据


GET canal_product/_search
复制代码



5、全量同步,修改 conf/es7/mytest_user.yml 配置文件,或者新建一个 yml 文件也可


dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值destination: example  # canal的instance或者MQ的topicgroupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据esMapping:  _index: canal_product # es 的索引名称  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配  sql: "SELECT         p.id as _id,         p.user_name,         p.phone,         p.age        FROM         UserInfo p "        # sql映射  etlCondition: "where p.id>={}"   #etl的条件参数  commitBatch: 3000   # 提交批大小
复制代码



curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml
复制代码




用户头像

不定期更新Java开发工具及Java面试干货技巧 2021.12.12 加入

Java后端工程师,十年大厂经验。具有扎实的Java、JEE基础知识。熟悉Spring、SpringMVC、Struts MyBatisHibernate等JEE常用框架。

评论

发布
暂无评论
1K字详解canal-1.1.5实时同步MySQL数据到Elasticsearch_MySQL_了不起的程序猿_InfoQ写作社区