写点什么

一文讲清楚 FusionInsight MRS CDL 如何使用

发布于: 4 小时前

​​摘要:CDL 是一种简单、高效的数据实时集成服务,能够从各种 OLTP 数据库中抓取 Data Change 事件,然后推送至 Kafka 中,最后由 Sink Connector 消费 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

 

本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。

 

CDL 是一种简单、高效的数据实时集成服务,能够从各种 OLTP 数据库中抓取 Data Change 事件,然后推送至 Kafka 中,最后由 Sink Connector 消费 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。


CDL 服务包含了两个重要的角色:CDLConnector 和 CDLService。CDLConnector 是具体执行数据抓取任务的实例,CDLService 是负责管理和创建任务的实例。


本此实践介绍以 mysql 作为数据源进行数据抓取。

前提条件


  • MRS 集群已安装 CDL 服务。

  • MySQL 数据库需要开启 mysql 的 bin log 功能(默认情况下是开启的)。

查看 MySQL 是否开启 bin log:

使用工具或者命令行连接 MySQL 数据库(本示例使用 navicat 工具连接),执行 show variables like 'log_%'命令查看。

例如在 navicat 工具选择"File> New Query"新建查询,输入如下 SQL 命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。

show variableslike 'log_%'



工具准备


现在 cdl 只能使用 rest api 的方式进行命令提交,所以需要提前安装工具进行调试。本文使用 VSCode 工具。



完成之后安装 rest client 插件:



完成之后创建一个 cdl.http 的文件进行编辑:



创建 CDL 任务


CDL 任务创建的流程图如下所示:



说明:需要先创建一个 MySQL link, 在创建一个 Kafka link, 然后再创建一个 CDL 同步任务并启动。


MySQL link 部分 rest 请求代码


@hostname = 172.16.9.113@port = 21495@host = {{hostname}}:{{port}}@bootstrap = "172.16.9.113:21007"@bootstrap_normal = "172.16.9.113:21005"@mysql_host = "172.16.2.118"@mysql_port = "3306"@mysql_database = "hudi"@mysql_user = "root"@mysql_password = "Huawei@123"
### get linksget https://{{host}}/api/v1/cdl/link
### mysql link validate
post https://{{host}}/api/v1/cdl/link?validate=truecontent-type: application/json
{"name": "MySQL_link", //link名,全局唯一,不能重复"description":"MySQL connection", //link描述"link-type":"mysql", //link的类型"enabled":"true","link-config-values": {"inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] }}
### mysql link create
post https://{{host}}/api/v1/cdl/linkcontent-type: application/json
{"name": "MySQL_link", //link名,全局唯一,不能重复"description":"MySQL connection", //link描述"link-type":"mysql", //link的类型"enabled":"true","link-config-values": {"inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] }}
### mysql link update
put https://{{host}}/api/v1/cdl/link/MySQL_linkcontent-type: application/json
{"name": "MySQL_link", //link名,全局唯一,不能重复"description":"MySQL connection", //link描述"link-type":"mysql", //link的类型"enabled":"true","link-config-values": {"inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] }}
复制代码


Kafka link 部分 rest 请求代码


### get linksget https://{{host}}/api/v1/cdl/link
### kafka link validate
post https://{{host}}/api/v1/cdl/link?validate=truecontent-type: application/json
{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values": {"inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] }}
### kafka link create
post https://{{host}}/api/v1/cdl/linkcontent-type: application/json
{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values": {"inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] }}
### kafka link update
put https://{{host}}/api/v1/cdl/link/kafka_linkcontent-type: application/json
{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values": {"inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] }}
复制代码


CDL 任务命令部分 rest 请求代码


@hostname = 172.16.9.113@port = 21495@host = {{hostname}}:{{port}}@bootstrap = "172.16.9.113:21007"@bootstrap_normal = "172.16.9.113:21005"@mysql_host = "172.16.2.118"@mysql_port = "3306"@mysql_database = "hudi"@mysql_user = "root"@mysql_password = "Huawei@123"
### create jobpost https://{{host}}/api/v1/cdl/jobcontent-type: application/json
{ "job_type": "CDL_JOB", //job类型,目前只支持CDL_JOB这一种 "name": "mysql_to_kafka", //job名称 "description":"mysql_to_kafka", //job描述 "from-link-name": "MySQL_link", //数据源Link "to-link-name": "kafka_link", //目标源Link "from-config-values": { "inputs": [ {"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"}, {"name" : "schema", "value" : "hudi"}, {"name" : "db.name.alias", "value" : "hudi"}, {"name" : "whitelist", "value" : "hudisource"}, {"name" : "tables", "value" : "hudisource"}, {"name" : "tasks.max", "value" : "10"}, {"name" : "mode", "value" : "insert,update,delete"}, {"name" : "parse.dml.data", "value" : "true"}, {"name" : "schema.auto.creation", "value" : "false"}, {"name" : "errors.tolerance", "value" : "all"}, {"name" : "multiple.topic.partitions.enable", "value" : "false"}, {"name" : "topic.table.mapping", "value" : "[ {\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"} ]" }, {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] }, "to-config-values": {"inputs": []}, "job-config-values": { "inputs": [ {"name" : "global.topic", "value" : "demo"} ] }}
### get all jobget https://{{host}}/api/v1/cdl/job### submit jobput https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start### get job statusget https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka### stop jobput https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop### delete jobDELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka
复制代码


场景验证


生产库 MySQL 原始数据如下:



提交 CDL 任务之后



增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

对应 kafka 消息体:



更改操作: UPDATE hudi.hudisource SET uname=‘AnneMarie333’ WHERE uid=11;

对应 kafka 消息体:



删除操作:delete from hudi.hudisource where uid=11;

对应 kafka 消息体:



点击关注,第一时间了解华为云新鲜技术~

发布于: 4 小时前阅读数: 4
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
一文讲清楚FusionInsight MRS CDL如何使用