写点什么

原来 Canal 也可以做 HA!

发布于: 2021 年 01 月 23 日
原来Canal也可以做HA!

前言

在做实时数仓时,数据量往往比较大的,如果使用 Canal 来监听 MySQL 的状态当 Canal 是单节服务时,服务器挂掉是就会造成数据丢失,这时 Canal 恰好可以配置 HA 这样就能解决单点问题,但是依赖于 zookeeper,那我们就来配置一下 Canal 的 HA。

一、Canal HA 模式配置

1.1 服务器端 HA 模式配置

canal 是支持 HA 的,其实现机制也是依赖 zookeeper 来实现的,用到的特性有 watcher 和 EPHEMERAL 节点(和 session 生命周期绑定),与 HDFS 的 HA 类似。


的 ha 分为两部分,canal server 和 canal client 分别有对应的 ha 实现

  • canal server: 为了减少对 mysql dump 的请求,不同 server 上的 instance(不同 server 上的相同 instance)要求同一时间只能有一个处于 running,其他的处于 standby 状态(standby 是 instance 的状态)。

  • canal client: 为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。


1.2 环境准备

  • Canal:node01,node02

  • zookeeper: node01,node02,node03

  • MySQL: node01


1.3 Canal HA 服务器配置

按照部署和配置,在单台机器上各自完成配置,演示时 instance name 为 example

修改 canal.properties,加上 zookeeper 配置


canal.zkServers=node01:2181,node02:2181,node03:2181# 需要价将这个文件关掉,打开default这个文件 这个文件配置了zookeeper地址#canal.instance.global.spring.xml = classpath:spring/file-instance.xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml
复制代码


# 目录位置canal/conf/example/instance.propertiescanal.instance.mysql.slaveId = 1235
复制代码

注意: 需要将 Canal 包拷贝到 node02 需要修改 canal.instance.mysql.slaveId 这个需要这个数字不能跟 node01 上的机器上一样,负责会出现问题

1.4 Canal 环境启动

  1. 启动 zookeeper

nohup /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start
复制代码
  1. 启动 node01 上的 Canal

./startup.bat
复制代码
  1. 启动 node02 上的 Canal

./startup.bat
复制代码


-------ssh node1sh bin/startup.sh--------ssh node2sh bin/startup.sh
复制代码


启动后,可以查看 logs/example/example.log,只会看到一台机器上出现了启动成功的日志。比如这里启动成功的是 node02

 2020-10-17 03:13:24.211 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-10-17 03:13:24.224 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]2020-10-17 03:13:24.340 [main] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!2020-10-17 03:13:24.486 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2020-10-17 03:13:24.509 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2020-10-17 03:13:24.604 [destination = example , address = node02/192.168.100.202:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
复制代码

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为 node01:11111


[zk: localhost:2181(CONNECTED) 2]  get /otter/canal/destinations/example/running{"active":true,"address":"192.168.100.201:11111","cid":1}cZxid = 0x6800000013ctime = Mon Oct 12 05:13:29 CST 2020mZxid = 0x6800000013mtime = Mon Oct 12 05:13:29 CST 2020pZxid = 0x6800000013cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x2751980dfb80000dataLength = 57numChildren = 0
复制代码


二、客户端连接

package com.canal.Test;
/** * @author 大数据老哥 * @version V1.0 * @Package com.canal.Test * @File :CanalTest.java * * @date 2021/1/11 21:54 */
import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;import java.util.List;
/** * 测试canal配置是否成功 */public class CanaHAlTest {
public static void main(String[] args) { //1.创建连接
CanalConnector connect = CanalConnectors.newClusterConnector("node01:2181,node02:2181,node03:2181", "example", "", ""); //指定一次性读取的条数 int bachChSize = 1000; // 设置转态 boolean running = true; while (running) { //2.建立连接 connect.connect(); //回滚上次请求的信息放置防止数据丢失 connect.rollback(); // 订阅匹配日志 connect.subscribe(); while (running) { Message message = connect.getWithoutAck(bachChSize); // 获取batchId long batchId = message.getId(); // 获取binlog数据的条数 int size = message.getEntries().size(); if (batchId == -1 || size == 0) {
} else { printSummary(message); } // 确认指定的batchId已经消费成功 connect.ack(batchId); } } }
private static void printSummary(Message message) { // 遍历整个batch中的每个binlog实体 for (CanalEntry.Entry entry : message.getEntries()) { // 事务开始 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; }
// 获取binlog文件名 String logfileName = entry.getHeader().getLogfileName(); // 获取logfile的偏移量 long logfileOffset = entry.getHeader().getLogfileOffset(); // 获取sql语句执行时间戳 long executeTime = entry.getHeader().getExecuteTime(); // 获取数据库名 String schemaName = entry.getHeader().getSchemaName(); // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取事件类型 insert/update/delete String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();
System.out.println("logfileName" + ":" + logfileName); System.out.println("logfileOffset" + ":" + logfileOffset); System.out.println("executeTime" + ":" + executeTime); System.out.println("schemaName" + ":" + schemaName); System.out.println("tableName" + ":" + tableName); System.out.println("eventTypeName" + ":" + eventTypeName);
CanalEntry.RowChange rowChange = null; try { // 获取存储数据,并将二进制字节数据解析为RowChange实体 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); }

// 迭代每一条变更数据 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 判断是否为删除事件 if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) { System.out.println("---delete---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); } // 判断是否为更新事件 else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) { System.out.println("---update---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); printColumnList(rowData.getAfterColumnsList()); } // 判断是否为插入事件 else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) { System.out.println("---insert---"); printColumnList(rowData.getAfterColumnsList()); System.out.println("---"); } } } }
// 打印所有列名和列值 private static void printColumnList(List<CanalEntry.Column> columnList) { for (CanalEntry.Column column : columnList) { System.out.println(column.getName() + "\t" + column.getValue()); } }}
复制代码

运行测试

14:24:50.371 [main-SendThread(node02:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x2751980dfb80001 after 1ms14:25:03.704 [main-SendThread(node02:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x2751980dfb80001 after 1ms
复制代码

去数据库中修改一条数据进行测试

logfileName:mysql-bin.000002logfileOffset:761executeTime:1602452082000schemaName:zwtableName:dw_t_producteventTypeName:update---update---goods_id	007goods_status	待审核createtime	2019-12-22modifytime	2019-12-22cdat	20191222---goods_id	007goods_status	待审核createtime	33modifytime	2019-12-22cdat	20191222
复制代码

这时我们就成功的获取到了我们修改的数据,这时有小伙伴说不是 HA 吗。把 node01 节点停掉看看任务会不会正常运行。

[root@node01 bin]# ./stop.sh node01: stopping canal 6345 ... Oook! cost:1
复制代码

去数据库中修改一条数据看看能不能获取到

logfileName:mysql-bin.000002logfileOffset:1071executeTime:1602452401000schemaName:zwtableName:dw_t_producteventTypeName:update---update---goods_id	004goods_status	已删除createtime	2019-12-15modifytime	2019-12-20cdat	20191222---goods_id	004goods_status	已删除createtime	2019-2-15modifytime	2019-12-20cdat	20191222
复制代码

发现也是可以获取到数据的,我们现在去 zookeeper 中看看 canal 对外提供服务的是那台节点


[zk: localhost:2181(CONNECTED) 0]  get /otter/canal/destinations/example/running  {"active":true,"address":"192.168.100.202:11111","cid":1}cZxid = 0x680000002actime = Mon Oct 12 05:39:05 CST 2020mZxid = 0x680000002amtime = Mon Oct 12 05:39:05 CST 2020pZxid = 0x680000002acversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x17532d2b90c0000dataLength = 57numChildren = 0
复制代码

可以清晰的发现成功切换到我们的 node02 节点上了


三、Canal Server HA 的流程图

  1. canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)

  2. 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态

  3. 一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance.

  4. canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect.


小结

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 到这里我们就解决了 Canal 的单点问题,现在大多数的组件都会创建 HA 的,首先对于公司而言数据是最终要的是数据,如果是一个单服务当服务出现问题时就会造成数据丢失,那这个损失就不知一点点了。我在这里为大家提供大数据的资料需要的朋友可以去下面 GitHub 去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

>资源获取 获取 Flink 面试题,Spark 面试题,程序员必备软件,hive 面试题,Hadoop 面试题,Docker 面试题,简历模板等资源请去

>GitHub 自行下载 https://github.com/lhh2002/Framework-Of-BigData

>Gitee 自行下载 https://gitee.com/liheyhey/dashboard/projects

>实时数仓代码 GitHub: https://github.com/lhh2002/RealTimeData_WareHouse

用户头像

微信搜公众号【大数据老哥】 2021.01.03 加入

微信搜索公众号【大数据老哥】 自己GitHub【https://github.com/lhh2002】 欢迎来star

评论

发布
暂无评论
原来Canal也可以做HA!