canal 笔记

用户头像
wkq2786130
关注
发布于: 2020 年 07 月 22 日



原文 http://weikeqin.com/2018/05/16/canal-notes/



使用canal前需要准备以下几个内容

1. 安装配置MySQL

1.1 安装 mysql,

1.2 配置 mysql binlog使用ROW模式

1.3 在MySQL添加对应的canal用户

1.4 检查canal用户生效

2. 下载canal并配置

2.1 下载canal

2.2 配置 canal

2.3 启动canal (需要JDK>=1.6.25)



(1) 配置MySQL



(1.1) 安装MySQL



参考 https://dev.mysql.com/doc/refman/5.7/en/installing.html



(1.2) 修改MySQL配置文件



canal的原理是基于mysql binlog技术,所以需要开启mysql的binlog写入功能,并且配置binlog模式为row.



[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction ,不能和 canal 的 slaveId 重复



(1.3) MySQL添加canal用户并授权



canal的原理是模拟自己为mysql slave,所以需要mysql slave的相关权限



CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;



<!--more-->



(1.4) 校验用户对应权限



1. show master status ;

如果正常显示binlog,则没问题,如果提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation ,则没有对应 REPLICATION CLIENT 权限

>

2. show slave status ;

如果提示 Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation ,则没有对应 REPLICATION SLAVE 权限



(2) 下载并启动canal



执行 ./bin/startup.sh 即可启动



(2.1) 下载canal



到 https://github.com/alibaba/canal/releases 选择合适的版本

下载 wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz



(2.2) 修改配置



修改 conf/example/instance.properties

以下只列出比较重要的配置



## mysql serverId 不能重复
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = database_wkq
canal.instance.connectionCharset = UTF-8
#table regex 需要监控的表 通过,分隔 也可以使用正则 .*\\..*
canal.instance.filter.regex = table_wkq,table_2,table_3,
# table black regex
canal.instance.filter.black.regex =



(2.3) 启动canal



通过 sh bin/startup.sh 或者 ./bin/startup.sh 启动



启动后通过 jps -l 命令 可以看到 com.alibaba.otter.canal.deployer.CanalLauncher



canal启动时canal.log



canal.deployer-1.0.24/logs/canal/canal.log



2018-07-23 20:27:46.449 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-07-23 20:27:46.625 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.62.130:11111]
2018-07-23 20:27:47.576 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2018-07-23 20:27:47.721 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx1 successful.
2018-07-23 20:27:47.802 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx2 successful.
2018-07-23 20:27:47.862 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx3 successful.
2018-07-23 20:27:47.921 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx4 successful.
2018-07-23 20:27:47.987 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx5 successful.
2018-07-23 20:27:48.044 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx6 successful.
2018-07-23 20:27:48.094 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx7 successful.



(2.2) canal正常启动时instance对应的日志



canal.deployer-1.0.24/logs/example/example.log



2018-07-23 20:27:47.429 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-07-23 20:27:47.436 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
2018-07-23 20:27:47.444 [canal-instance-scan-0] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2018-07-23 20:27:47.451 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx
2018-07-23 20:27:47.453 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-07-23 20:27:47.666 [destination = xxx , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status



<br>



停止canal



sh stop.sh ./bin/stop.sh



2018-07-23 21:45:08.241 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## stop the canal server
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[10.0.62.130:11111]
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## canal server is down.



(4) 程序中使用



以下代码仅作为示例



<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>



import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CanalTest
*
* @author: weikeqin.cn@gmail.com
* @date: 2020-05-30 08:26
**/
@Slf4j
public class CanalTest {
/**
* @param args
*/
public static void main(String args[]) {
String canalHost = "127.0.0.1";
int canalPort = 11111;
String destination = "example";
InetSocketAddress address = new InetSocketAddress(canalHost, canalPort);
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "", "");
// connector = CanalConnectors.newClusterConnector(addresses, destination, "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
// 链接对应的canal server
connector.connect();
// 客户端订阅,不提交客户端filter,以服务端的filter为准
connector.subscribe();
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
int totalEmptyCount = 12000000;
// 退出条件 一般是 while true
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
log.info("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.info("", e);
}
} else {
emptyCount = 0;
log.info("message[batchId={},size={}] ", batchId, size);
// 消费
consumeMsg(message.getEntries());
}
// 提交确认
connector.ack(batchId);
// 处理失败, 回滚数据
// connector.rollback(batchId);
}
log.info("empty too many times, exit");
} finally {
// 释放链接
connector.disconnect();
}
}
/**
* 消费消息
*
* @param entries
*/
private static void consumeMsg(List<CanalEntry.Entry> entries) {
// 这里只打印
printEntry(entries);
// TODO 其它操作
}
/**
* @param entrys
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
log.info(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType)
);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
log.info("------- before");
printColumn(rowData.getBeforeColumnsList());
log.info("------- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/**
* @param columns
*/
private static void printColumn(List<CanalEntry.Column> columns) {
Map<String, String> map = new HashMap<>();
for (CanalEntry.Column column : columns) {
map.put(column.getName(), column.getValue());
//log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
log.info("{}", map);
}
}



(5) canal复制原理



复制如何工作,整体上来说,复制有3个步骤:

(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);

(2) slave将master的binary log events复制到它的中继日志(relay log)中;

(3) slave读取中继日志中的事件,将其重放到备库数据之上。

下图描述了复制的过程:



(6) 遇到的问题



(6.1) Error When doing Client Authentication:ErrorPacket



Caused by: java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:208)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:71)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:56)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:157)
at java.lang.Thread.run(Thread.java:748)



原因 用户名密码不正确

(6.2) Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation



ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx
[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
with command: show master status

用canal账户登录后发现可以查看对应数据库对应表的数据,但是 show master status 提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

>

1、instance.properties配置文件里配置的用户没有REPLICATION权限

2、canal instance.properties 配置错误

3、配置文件里用户名密码不正确

4、MySQL对应用户不存在

5、MySQL配置不对

给canal用户对应的replication权限

grant replication client on *.* to 'canal'@'%';

flush privileges



(6.3) Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation



[destination = xxx , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx[java.io.IOException: Received error packet: errno = 1227, sqlstate = 42000 errmsg = Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:95)



Access denied 没权限 需要给对应账户授权

REPLICATION SLAVE 常用于建立复制时所需要用到的用户权限,也就是slave server必须被master server授权具有该权限的用户,才能通过该用户复制。

并且"SHOW SLAVE HOSTS"这条命令和REPLICATION SLAVE权限有关,否则执行时会报错:



REPLICATION CLIENT 不可用于建立复制,有该权限时,只是多了可以使用如"SHOW SLAVE STATUS"、"SHOW MASTER STATUS"等命令。

在5.6.6版本以后,也可以使用"SHOW BINARY LOGS"。



GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'

flush privileges

(6.4) canal用了UseConcMarkSweepGC不能用JDK14



Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option UseConcMarkSweepGC; support was removed in 14.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option CMSParallelRemarkEnabled; support was removed in 14.0
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.



修改 bin/start.sh 文件,修改对应的JAVA路径

## set java path
if [ -z "$JAVA" ] ; then
#JAVA=$(which java)
JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java"
fi



(6.5) com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused

Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)



canal没启动 或者 canal挂了

配置被删了,检查对应 destinationinstance.properties

instance.properties 没配置



<br>



(7) canal-admin后台管理



canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

简单来说,canal-admin是一个后台维护系统,简化了配置canal的工作,提高了效率,终于不用到服务器上一个一个配了



访问地址 http://127.0.0.1:8089/



(7.1) canal-admin的核心模型主要有



instance,对应canal-server里的instance,一个最小的订阅mysql的队列

server,对应canal-server,一个server里可以包含多个instance

集群,对应一组canal-server,组合在一起面向高可用HA的运维



References

[1] canal/wiki

[2] canal-AdminGuide

[3] ClientExample

[4] Canal-Admin-QuickStart

[5] Canal-Admin-Guide

[6] canal配置使用

[7] Mysql 普通账户授权replication client后登录失败问题

[8] REPLICATION SLAVE 与 REPLICATION CLIENT 权限

[9] 对replication slave,replication client的一点说明

[10] MySQL 5.6 Reference Manual – 6.2.1 Privileges Provided by MySQL

[11] SimpleCanalClientTest

[12] ClusterCanalClientTest



发布于: 2020 年 07 月 22 日 阅读数: 61
用户头像

wkq2786130

关注

hello 2018.09.28 加入

http://weikeqin.com/

评论

发布
暂无评论
canal 笔记