写点什么

flink-cdc 之 mysql 到 es

  • 2025-08-28
    北京
  • 本文字数:5452 字

    阅读完需:约 18 分钟

本文分享自天翼云开发者社区《flink-cdc之mysql到es》,作者:刘****猛

环境搭建

version: '2'services:  elasticsearch:    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1    ports:      - "9200:9200"      - "9300:9300"    environment:      discovery.type: single-node  kibana:    image: docker.elastic.co/kibana/kibana:7.6.1    ports:      - "5601:5601"    environment:      ELASTICSEARCH_URL: ://elasticsearch:9200
复制代码

es 加密开启,配置文件映射到宿主机

docker cp 39:/usr/share/elasticsearch/config /root/docker-build/es/config
docker cp 7b:/usr/share/kibana/config /root/docker-build/es/kibana/config
复制代码

需要在配置文件中开启 x-pack 验证, 修改 config 目录下面的 elasticsearch.yml 文件,在里面添加如下内容,

xpack.security.enabled: truexpack.license.self_generated.type: basicxpack.security.transport.ssl.enabled: true
复制代码

重启 es 

再次进入容器

修改 kibana 的配置文件 kibana.yml

server.name: kibanaserver.host: "0"elasticsearch.hosts: [ "://elasticsearch:9200" ]xpack.monitoring.ui.container.elasticsearch.enabled: trueelasticsearch.username: "elastic"  # es账号elasticsearch.password: "*******"   # es密码
复制代码

 通过 kibana 的 develop 界面执行相关指令

创建索引

PUT order_index{    "settings":{        "index":{            "number_of_shards":1,            "number_of_replicas":0        }    }}
复制代码

创建 mapping

PUT order_index/_mapping{    "properties":{        "order_id":{            "type":"long"        },        "goods_name":{            "type":"text"        },        "goods_count":{            "type":"long"        },        "goods_price":{            "type":"text"        },        "order_money":{            "type":"text"        }    }}
复制代码

查看索引详情

GET order_index  返回值

{  "order_index" : {    "aliases" : { },    "mappings" : {      "properties" : {        "goods_count" : {          "type" : "long"        },        "goods_name" : {          "type" : "text"        },        "goods_price" : {          "type" : "text"        },        "order_id" : {          "type" : "long"        },        "order_money" : {          "type" : "text"        }      }    },    "settings" : {      "index" : {        "creation_date" : "1685094234700",        "number_of_shards" : "1",        "number_of_replicas" : "0",        "uuid" : "YLwsxO1pS6qWolb2N7cG5w",        "version" : {          "created" : "7060199"        },        "provided_name" : "order_index"      }    }  }}
复制代码

 

mysql 创建数据表及数据

CREATE TABLE `my_order` (  `order_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单id',  `order_money` decimal(8,2) NOT NULL COMMENT '订单金额',  `user_id` int(8) NOT NULL COMMENT '用户id',  `sub_province` varchar(20) NOT NULL COMMENT '下单时 省',  `sub_city` varchar(20)  NOT NULL COMMENT '下单时 市',  `sub_district` varchar(20) NOT NULL COMMENT '下单时 区',  `payment_status` int(1) NOT NULL DEFAULT '0' COMMENT '付款状态 0正常 1作废',  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单表';
CREATE TABLE `my_order_goods` ( `order_goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单商品id', `order_id` int(8) NOT NULL COMMENT '订单id', `goods_id` int(8) NOT NULL COMMENT '商品id', `sub_goods_name` varchar(50) NOT NULL COMMENT '下单时商品名称', `sub_goods_price` decimal(8,2) NOT NULL COMMENT '下单时商品价格', `goods_count` int(11) NOT NULL COMMENT '下单了多少件', PRIMARY KEY (`order_goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单下单商品表';
CREATE TABLE `my_goods` ( `goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品id', `goods_price` decimal(8,2) NOT NULL COMMENT '商品价格', `goods_name` varchar(50) NOT NULL COMMENT '商品名称', `goods_details` varchar(255) DEFAULT NULL COMMENT '商品详情', PRIMARY KEY (`goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟商品表';
复制代码

写入样本数据

-- 初始化订单数据INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (1, 19.80, 1, '北京', '北京市', '西城区', 0, '2021-06-10 11:02:29');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (2, 9.90, 1, '北京', '北京市', '丰台区', 0, '2021-06-10 11:02:59');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (3, 300.00, 1, '北京', '北京市', '朝阳区', 0, '2021-06-10 11:03:16');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (4, 66.60, 1, '北京', '北京市', '顺义区', 0, '2021-06-10 11:03:32');
-- 初始化商品数据INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (1, 9.90, '两次性保温杯-改名称了~', '我是一只保温杯~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (2, 100.00, '欧莱雅男士洗面奶', '只买贵的,不买对的~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (3, 66.60, 'ipone13双面曲折屏', '是苹果,不是吃的那种...');
-- 初始化订单商品数据(暂时不考虑一对多)INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (1, 1, 1, '一次性保温杯', 9.90, 2);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (2, 2, 1, '一次性保温杯', 9.90, 1);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (3, 3, 2, '欧莱雅洗面奶', 100.00, 3);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (4, 4, 3, '吃的苹果', 66.60, 1);
复制代码

 

flinksql

CREATE TABLE my_order (  order_id INT primary key not enforced,  order_money DECIMAL(8, 2)) WITH ( 'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_order',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false'  );CREATE TABLE my_goods (  goods_id INT primary key not enforced,  goods_name STRING,  goods_price DECIMAL(8, 2)) WITH ( 'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_goods',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false');CREATE TABLE my_order_goods (  order_id INT primary key not enforced,  goods_id INT,  goods_count INT) WITH (   'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_order_goods',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false');CREATE TABLE order_index(  order_id INT,  goods_name STRING,  goods_count INT,  goods_price DECIMAL(8, 2),  order_money DECIMAL(8, 2),  PRIMARY KEY (order_id) NOT ENFORCED) WITH (    'connector' = 'elasticsearch-7',    'hosts' = '101.43.164.4:9200',    'index' = 'order_index',    'username' = 'elastic',    'password' = '******');
insert into order_indexselect mo.order_id, mg.goods_name, mog.goods_count, mg.goods_price, mo.order_moneyfrom my_order mo left join my_order_goods mog on mo.order_id = mog.order_idleft join my_goods mg on mog.goods_id = mg.goods_id;
复制代码

kibana 中查询 es 数据

POST order_index/_search{  "size": 20,  "query": {"match_all": {     }}}
复制代码

 

{  "took" : 0,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 4,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "3",        "_score" : 1.0,        "_source" : {          "order_id" : 3,          "goods_name" : "欧莱雅男士洗面奶",          "goods_count" : 3,          "goods_price" : 100.0,          "order_money" : 300.0        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "4",        "_score" : 1.0,        "_source" : {          "order_id" : 4,          "goods_name" : "ipone13双面曲折屏~",          "goods_count" : 1,          "goods_price" : 66.6,          "order_money" : 66.6        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "1",        "_score" : 1.0,        "_source" : {          "order_id" : 1,          "goods_name" : "两次性保温杯-3342",          "goods_count" : 2,          "goods_price" : 9.9,          "order_money" : 19.8        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "2",        "_score" : 1.0,        "_source" : {          "order_id" : 2,          "goods_name" : "两次性保温杯-3342",          "goods_count" : 1,          "goods_price" : 9.9,          "order_money" : 9.9        }      }    ]  }}
复制代码

此时修改 mysql 商品表

UPDATE `cdc-source`.`my_goods` SET `goods_name` = '两次性保温杯-我又改名了' WHERE `goods_id` = 1
复制代码

 此时查看 es 中的数据

{  "took" : 363,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 4,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "4",        "_score" : 1.0,        "_source" : {          "order_id" : 4,          "goods_name" : "ipone13双面曲折屏",          "goods_count" : 1,          "goods_price" : 66.6,          "order_money" : 66.6        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "3",        "_score" : 1.0,        "_source" : {          "order_id" : 3,          "goods_name" : "欧莱雅男士洗面奶",          "goods_count" : 3,          "goods_price" : 100.0,          "order_money" : 300.0        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "2",        "_score" : 1.0,        "_source" : {          "order_id" : 2,          "goods_name" : "两次性保温杯-我又改名了",          "goods_count" : 1,          "goods_price" : 9.9,          "order_money" : 9.9        }      },      {        "_index" : "order_index",        "_type" : "_doc",        "_id" : "1",        "_score" : 1.0,        "_source" : {          "order_id" : 1,          "goods_name" : "两次性保温杯-我又改名了",          "goods_count" : 2,          "goods_price" : 9.9,          "order_money" : 19.8        }      }    ]  }}
复制代码


用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
flink-cdc之mysql到es_数据库_天翼云开发者社区_InfoQ写作社区