写点什么

kafka 监听 mysql 实时数据变更

发布于: 2020 年 07 月 07 日

在做报表数据统计时,我们用的是 mysql + kafka + Spark Streaming 方案, kafka 监听 mysql 订单表中订单状态,然后发送到 spark streaming 中进行分析统计。 这里记录一下 kafka 监听 mysql 中数据变更方案

1、Kafka connect

  • 1.简介 kafka connect 是一个可扩展的、可靠的在 kafka 和其他系统之间流传输的数据工具。简而言之就是他可以通过 Connector(连接器)简单、快速的将大集合数据导入和导出 kafka。

  • 可以接收整个数据库或收集来自所有的应用程序的消息到 kafka 的 topic 中 Kafka connect 是 Confluent 公司(当时开发出 Apache Kafka 的核心团队成员出来创立的新公司)开发的 confluent platform 的核心功能.

  • 大家都知道现在数据的 ETL 过程经常会选择 kafka 作为消息中间件应用在离线和实时的使用场景中,而 kafka 的数据上游和下游一直没有一个无缝衔接的 pipeline 来实现统一,比如会选择 flume 或者 logstash 采集数据到 kafka,然后 kafka 又通过其他方式 pull 或者 push 数据到目标存储.

  •   而 kafka connect 旨在围绕 kafka 构建一个可伸缩的,可靠的数据流通道,通过 kafka connect 可以快速实现大量数据进出 kafka 从而和其他源数据源或者目标数据源进行交互构造一个低延迟的数据 pipeline。


具体官网文档 www.confluent.io/. https://docs.confluent.io/2.0.0/connect/connect-jdbc/docs/index.html#examples

2.安装 kafka 安装 下载 kafka-connect-jdbc

下载成功后,从 confluentinc-kafka-connect-jdbc-4.1.2.zip libs 中获取到 kafka-connect-jdbc-4.1.2.jar,并把其放到 kafka 安装目录下 libs 文件夹中 下载 mysql-connector-java-5.1.47.jar,并把其放到 kafka 安装目录下 libs 文件夹中


  • 3.使用 

  • 1.启动 kafka sh kafkaStart.sh 

  • 2.创建 kafka topic ./bin/kafka-run-class.sh kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-comments 

  • 3.新建 source/sink 配置文件,并放置在 kafka config 目录下 vim quickstart-mysql.properties name=mysql-b-source-comments connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://127.0.0.1:3306/android_service?user=xxx&password=xxxx # timestamp+incrementing 时间戳自增混合模式 mode=timestamp+incrementing # 时间戳 commenttime timestamp.column.name=commenttime # 自增字段 id incrementing.column.name=id # 白名单表 comments table.whitelist=comments # topic 前缀 mysql-kafka- topic.prefix=mysql-kafka- vim quickstart-mysql-sink.properties name=mysql-b-sink-comments 

  • connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka 的 topic 名称 topics=mysql-kafka-comments # 配置 JDBC 链接 connection.url=jdbc:mysql://127.0.0.1:3306/android_service?user=xxx&password=xxxx # 不自动创建表,如果为 true,会自动创建表,表名为 topic 名称 auto.create=false # upsert model 更新和插入 insert.mode=upsert # 下面两个参数配置了以 pid 为主键更新 pk.mode = record_value pk.fields = id #表名为 kafkatable table.name.format=kafkacomments


 4.启动 kafka connect ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/quickstart-mysql.properties ./config/quickstart-mysql-sink.properties 启动过程中有报 8083 端口已经被占用,

在 config 目录下,修改 connect-standalone 文件,在最后一样添加,用于修改监听 REST API 的默认端口 #用于监听 REST API 的端口 rest.port=8003 插入数据

在 comments 表中插入数据后,可以看到在 kafkacomments 表中也同步插入了数据 image.png image.png 在 comments 表中更新数据后,可以看到在 kafkacomments 表中也同步更新了数据 image.png image.png 

6.查看 kafka topic 中数据 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql-kafka-comments --from-beginning image.png 

7.自定义开发 connect https://github.com/confluentinc/kafka-connect-jdbc 如果有需求要自定义开发 connect 的话,可以直接在这个源码中开发,然后打成 jar 包。开发一个连接器只需要实现两个接口,Connector 和 Task 


2、canal

canal 是阿里开源的中间件,纯 Java 开发,基于数据库增量日志解析,提供增量数据订阅 &消费,目前主要支持了 MySQL(也支持 mariaDB),主要用于同步 mysql 数据库变更,是一个非常成熟的数据库同步方案。 

canal 是通过模拟成为 mysql 的 slave 的方式,监听 mysql 的 binlog 日志来获取数据,binlog 设置为 row 模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal 就能高性能的获取到 mysql 数据数据的变更。

canal 的部署主要分为 server 端和 client 端。 server 端部署好以后,可以直接监听 mysql binlog,因为 server 端是把自己模拟成了 mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要 client 端进行处理。 client 端一般是需要大家进行简单的开发。https://github.com/alibaba/canal/wiki/ClientAPI 有一个简单的示例,很容易理解。

canal 1.1.1 版本之后, 默认支持将 canal server 接收到的 binlog 数据直接投递到 MQ(kafka,RocketMQ) https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart


以上就是我总结的"kafka 监听 mysql 实时数据变更"的技术点,希望以上的内容可以帮助到正在默默艰辛

遇到瓶疾且不知道怎么办的 Java 程序员们,需要学习,获取 zl 的也可以微我哦

希望可以帮助在这个行业发展的朋友和童鞋们,在论坛博客等地方少花些时间找资料,把有限的时间,真正花在学习上。

如若知识点总结的有不足的地方,欢迎各位朋友指正,觉得有收获的朋友也可以点赞关注一下。



用户头像

还未添加个人签名 2020.07.02 加入

还未添加个人简介

评论

发布
暂无评论
kafka监听mysql实时数据变更