前言
我们在做实时数仓时数据往往都是保存到数据库中例如 MySQL,当有一条数据新增或修改需要马上将数据同步到 kafka 中或其他的数据库中,这时候我们需要借助阿里开源出来的Canal
,来实现我们功能。
一、什么是 Canal
我们看下官网的描述:
>canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL
数据库增量日志解析,提供增量数据订阅和消费
根据官网的描述我们大约可以理解为Canal
主要是基于MySQL
做增量数据同步的例如:将数据实时同步到 kafka、HBase、ES 等,可以理解一个数据同步工具
。
二、Canal 能干什么
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
注意: 当前 Canal 支持的 MySQL 版本有 5.1.x
, 5.5.x
, 5.6.x
, 5.7.x
, 8.0.x
三、Canal 工作原理
MySQL slave 工作原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
四、部署 Canal
4.1 安装 MySQL
我之前发过如何部署 MySQL 我在这就不在写一遍了,如果你的机器中没有安装 MySQL 那可以去看这篇—> https://blog.csdn.net/qq_43791724/article/details/108196454
开启 MySQL 的 binary log 日志
当我们在安装成功 MySQL 成功后会有一个my.cnf
文件需要添加一下内容
[mysqld]
log-bin=/var/lib/mysql/mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码
注意: 当我们在开启了binary log
日志模式后会在我们log-bin
目录下创建 mysql-bin.*
的文件。当我们数据库中的数据发生改变时就会mysql-bin.*
文件中生成记录。
4.2 安装 Canal
去官下载需要的版本 https://github.com/alibaba/canal/releases
我在这里使用的版本为:1.0.24
将下载好的 gz 包上传到指定的目录下
创建个文件夹
解压 gz 包
tar -zxvf canal.deployer-1.0.24.tar.gz -C ../servers/canal/
复制代码
配置 canal.properties
common 属性前四个配置项:
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=
复制代码
canal.id 是 canal 的编号,在集群环境下,不同 canal 的 id 不同,注意它和mysql的server_id不同
。ip 这里不指定,默认为本机,比如上面是 192.168.100.201,端口号是 11111。zk 用于 canal cluster。
再看下canal.properties
下 destinations 相关的配置:
#################################################
######### destinations #############
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
复制代码
这里的canal.destinations = example
可以设置多个,比如 example1,example2,
则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties
文件。
全局的 canal 实例管理用 spring,这里的file-instance.xml
最终会实例化所有的 destinations instances:
全局的 canal 实例管理用 spring,这里的file-instance.xml
最终会实例化所有的 destinations instances:
<!-- properties -->
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
<property name="ignoreResourceNotFound" value="true" />
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
<property name="locationNames">
<list>
<value>classpath:canal.properties</value> <value>classpath:${canal.instance.destination:}/instance.properties</value>
</list>
</property>
</bean>
<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="propertyEditorRegistrars">
<list>
<ref bean="socketAddressEditor" />
</list>
</property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser">
<ref local="eventParser" />
</property>
<property name="eventSink">
<ref local="eventSink" />
</property>
<property name="eventStore">
<ref local="eventStore" />
</property>
<property name="metaManager">
<ref local="metaManager" />
</property>
<property name="alarmHandler">
<ref local="alarmHandler" />
</property>
</bean>
复制代码
比如canal.instance.destination
等于 example,就会加载example/instance.properties
配置文件
修改 instance 配置文件
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
canal.instance.mysql.slaveId = 1234
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
#################################################
复制代码
启动
关闭
通过 jps 查询服务转态
[root@node01 ~]# jps
2133 CanalLauncher
4184 Jps
复制代码
到这里说明我们的服务就配好了,这时候我们可以使用 java 代码创建一个客户端来进行测试
五、通过 Java 编写 Canal 客户端
5.1 导入依赖
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
复制代码
5.2 编写测试类
package com.canal.Test;
/**
* @author 大数据老哥
* @version V1.0
* @Package com.canal.Test
* @File :CanalTest.java
* @date 2021/1/11 21:54
*/
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 测试canal配置是否成功
*/
public class CanalTest {
public static void main(String[] args) {
//1.创建连接
CanalConnector connect = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.100.201", 11111),
"example", "", ""); //指定一次性读取的条数
int bachChSize = 1000;
// 设置转态
boolean running = true;
while (running) {
//2.建立连接
connect.connect();
//回滚上次请求的信息放置防止数据丢失
connect.rollback();
// 订阅匹配日志
connect.subscribe();
while (running) {
Message message = connect.getWithoutAck(bachChSize);
// 获取batchId
long batchId = message.getId();
// 获取binlog数据的条数
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
printSummary(message);
}
// 确认指定的batchId已经消费成功
connect.ack(batchId);
}
}
}
private static void printSummary(Message message) {
// 遍历整个batch中的每个binlog实体
for (CanalEntry.Entry entry : message.getEntries()) {
// 事务开始
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 获取binlog文件名
String logfileName = entry.getHeader().getLogfileName();
// 获取logfile的偏移量
long logfileOffset = entry.getHeader().getLogfileOffset();
// 获取sql语句执行时间戳
long executeTime = entry.getHeader().getExecuteTime();
// 获取数据库名
String schemaName = entry.getHeader().getSchemaName();
// 获取表名
String tableName = entry.getHeader().getTableName();
// 获取事件类型 insert/update/delete
String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();
System.out.println("logfileName" + ":" + logfileName);
System.out.println("logfileOffset" + ":" + logfileOffset);
System.out.println("executeTime" + ":" + executeTime);
System.out.println("schemaName" + ":" + schemaName);
System.out.println("tableName" + ":" + tableName);
System.out.println("eventTypeName" + ":" + eventTypeName);
CanalEntry.RowChange rowChange = null;
try {
// 获取存储数据,并将二进制字节数据解析为RowChange实体
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
// 迭代每一条变更数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 判断是否为删除事件
if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) {
System.out.println("---delete---");
printColumnList(rowData.getBeforeColumnsList());
System.out.println("---");
}
// 判断是否为更新事件
else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) {
System.out.println("---update---");
printColumnList(rowData.getBeforeColumnsList());
System.out.println("---");
printColumnList(rowData.getAfterColumnsList());
}
// 判断是否为插入事件
else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) {
System.out.println("---insert---");
printColumnList(rowData.getAfterColumnsList());
System.out.println("---");
}
}
}
}
// 打印所有列名和列值
private static void printColumnList(List<CanalEntry.Column> columnList) {
for (CanalEntry.Column column : columnList) {
System.out.println(column.getName() + "\t" + column.getValue());
}
}
}
复制代码
5.3 启动测试
在数据库中随便修改一条数据看看能不能使用 Canal 客户端能不能消费到
小结
今天给大家分享了 Canle 它的主要的功能做增量数据同步,后面会使用 Canle 进行做实时数仓。 我在这里为大家提供大数据的资源
需要的朋友可以去下面 GitHub 去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~
>资源获取 获取 Flink 面试题,Spark 面试题,程序员必备软件,hive 面试题,Hadoop 面试题,Docker 面试题,简历模板等资源请去
>GitHub 自行下载 https://github.com/lhh2002/Framework-Of-BigData
>Gitee 自行下载 https://gitee.com/liheyhey/dashboard/projects
评论