写点什么

一文带你快速入门 Canal,看这篇就够了!

发布于: 2021 年 01 月 12 日
一文带你快速入门Canal,看这篇就够了!

前言

我们在做实时数仓时数据往往都是保存到数据库中例如 MySQL,当有一条数据新增或修改需要马上将数据同步到 kafka 中或其他的数据库中,这时候我们需要借助阿里开源出来的Canal,来实现我们功能。

一、什么是 Canal

我们看下官网的描述:

>canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费


根据官网的描述我们大约可以理解为Canal主要是基于MySQL做增量数据同步的例如:将数据实时同步到 kafka、HBase、ES 等,可以理解一个数据同步工具

二、Canal 能干什么

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理


注意: 当前 Canal 支持的 MySQL 版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x


三、Canal 工作原理

MySQL slave 工作原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)


四、部署 Canal

4.1 安装 MySQL

我之前发过如何部署 MySQL 我在这就不在写一遍了,如果你的机器中没有安装 MySQL 那可以去看这篇—> https://blog.csdn.net/qq_43791724/article/details/108196454


开启 MySQL 的 binary log 日志


         当我们在安装成功 MySQL 成功后会有一个my.cnf文件需要添加一下内容

[mysqld]log-bin=/var/lib/mysql/mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码

注意: 当我们在开启了binary log日志模式后会在我们log-bin目录下创建 mysql-bin.* 的文件。当我们数据库中的数据发生改变时就会mysql-bin.*文件中生成记录。


4.2 安装 Canal

去官下载需要的版本 https://github.com/alibaba/canal/releases

我在这里使用的版本为:1.0.24

  1. 将下载好的 gz 包上传到指定的目录下

  2. 创建个文件夹

mkdir canal
复制代码
  1. 解压 gz 包

tar -zxvf canal.deployer-1.0.24.tar.gz  -C ../servers/canal/
复制代码
  1. 配置 canal.properties


common 属性前四个配置项:


canal.id= 1canal.ip=canal.port= 11111canal.zkServers=
复制代码

canal.id 是 canal 的编号,在集群环境下,不同 canal 的 id 不同,注意它和mysql的server_id不同。ip 这里不指定,默认为本机,比如上面是 192.168.100.201,端口号是 11111。zk 用于 canal cluster。

  1. 再看下canal.propertiesdestinations 相关的配置:

##########################################################       destinations        ############# #################################################canal.destinations = examplecanal.conf.dir = ../confcanal.auto.scan = truecanal.auto.scan.interval = 5canal.instance.global.mode = spring canal.instance.global.lazy = falsecanal.instance.global.spring.xml = classpath:spring/file-instance.xml
复制代码

这里的canal.destinations = example可以设置多个,比如 example1,example2,

则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

全局的 canal 实例管理用 spring,这里的file-instance.xml最终会实例化所有的 destinations instances:


  1. 全局的 canal 实例管理用 spring,这里的file-instance.xml最终会实例化所有的 destinations instances:

<!-- properties --><bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">	<property name="ignoreResourceNotFound" value="true" />    <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->    <property name="locationNames">    	<list>        	<value>classpath:canal.properties</value>                     <value>classpath:${canal.instance.destination:}/instance.properties</value>         </list>    </property></bean>
<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" /><bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> <property name="propertyEditorRegistrars"> <list> <ref bean="socketAddressEditor" /> </list> </property></bean><bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"> <property name="destination" value="${canal.instance.destination}" /> <property name="eventParser"> <ref local="eventParser" /> </property> <property name="eventSink"> <ref local="eventSink" /> </property> <property name="eventStore"> <ref local="eventStore" /> </property> <property name="metaManager"> <ref local="metaManager" /> </property> <property name="alarmHandler"> <ref local="alarmHandler" /> </property></bean>
复制代码

比如canal.instance.destination等于 example,就会加载example/instance.properties配置文件

  1. 修改 instance 配置文件

## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样canal.instance.mysql.slaveId = 1234
# 按需修改成自己的数据库信息#################################################...canal.instance.master.address=192.168.1.120:3306# username/password,数据库的用户名和密码...canal.instance.dbUsername = rootcanal.instance.dbPassword = 123456#################################################
复制代码


  1. 启动

sh bin/startup.sh
复制代码
  1. 关闭

sh bin/stop.sh
复制代码
  1. 通过 jps 查询服务转态

[root@node01 ~]# jps2133 CanalLauncher4184 Jps
复制代码

到这里说明我们的服务就配好了,这时候我们可以使用 java 代码创建一个客户端来进行测试


五、通过 Java 编写 Canal 客户端

5.1 导入依赖

 <dependencies>        <dependency>            <groupId>com.alibaba.otter</groupId>            <artifactId>canal.client</artifactId>            <version>1.0.24</version>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.58</version>        </dependency>    </dependencies>
复制代码

5.2 编写测试类

package com.canal.Test;
/** * @author 大数据老哥 * @version V1.0 * @Package com.canal.Test * @File :CanalTest.java * @date 2021/1/11 21:54 */
import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;import java.util.List;
/** * 测试canal配置是否成功 */public class CanalTest {
public static void main(String[] args) { //1.创建连接 CanalConnector connect = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.100.201", 11111), "example", "", ""); //指定一次性读取的条数 int bachChSize = 1000; // 设置转态 boolean running = true; while (running) { //2.建立连接 connect.connect(); //回滚上次请求的信息放置防止数据丢失 connect.rollback(); // 订阅匹配日志 connect.subscribe(); while (running) { Message message = connect.getWithoutAck(bachChSize); // 获取batchId long batchId = message.getId(); // 获取binlog数据的条数 int size = message.getEntries().size(); if (batchId == -1 || size == 0) {
} else { printSummary(message); } // 确认指定的batchId已经消费成功 connect.ack(batchId); } } }
private static void printSummary(Message message) { // 遍历整个batch中的每个binlog实体 for (CanalEntry.Entry entry : message.getEntries()) { // 事务开始 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; }
// 获取binlog文件名 String logfileName = entry.getHeader().getLogfileName(); // 获取logfile的偏移量 long logfileOffset = entry.getHeader().getLogfileOffset(); // 获取sql语句执行时间戳 long executeTime = entry.getHeader().getExecuteTime(); // 获取数据库名 String schemaName = entry.getHeader().getSchemaName(); // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取事件类型 insert/update/delete String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();
System.out.println("logfileName" + ":" + logfileName); System.out.println("logfileOffset" + ":" + logfileOffset); System.out.println("executeTime" + ":" + executeTime); System.out.println("schemaName" + ":" + schemaName); System.out.println("tableName" + ":" + tableName); System.out.println("eventTypeName" + ":" + eventTypeName);
CanalEntry.RowChange rowChange = null; try { // 获取存储数据,并将二进制字节数据解析为RowChange实体 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); }
// 迭代每一条变更数据 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 判断是否为删除事件 if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) { System.out.println("---delete---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); } // 判断是否为更新事件 else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) { System.out.println("---update---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); printColumnList(rowData.getAfterColumnsList()); } // 判断是否为插入事件 else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) { System.out.println("---insert---"); printColumnList(rowData.getAfterColumnsList()); System.out.println("---"); } } } } // 打印所有列名和列值 private static void printColumnList(List<CanalEntry.Column> columnList) { for (CanalEntry.Column column : columnList) { System.out.println(column.getName() + "\t" + column.getValue()); } }}
复制代码

5.3 启动测试

在数据库中随便修改一条数据看看能不能使用 Canal 客户端能不能消费到

小结

今天给大家分享了 Canle 它的主要的功能做增量数据同步,后面会使用 Canle 进行做实时数仓。 我在这里为大家提供大数据的资源需要的朋友可以去下面 GitHub 去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

>资源获取 获取 Flink 面试题,Spark 面试题,程序员必备软件,hive 面试题,Hadoop 面试题,Docker 面试题,简历模板等资源请去

>GitHub 自行下载 https://github.com/lhh2002/Framework-Of-BigData

>Gitee 自行下载 https://gitee.com/liheyhey/dashboard/projects


用户头像

微信搜公众号【大数据老哥】 2021.01.03 加入

微信搜索公众号【大数据老哥】 自己GitHub【https://github.com/lhh2002】 欢迎来star

评论

发布
暂无评论
一文带你快速入门Canal,看这篇就够了!