沧海一粟

天下事有难易乎?为之,则难者亦易矣;不为,则易者亦难矣。

0%

canal 笔记

使用canal前需要准备以下几个内容

  1. 安装配置MySQL
    1.1 安装 mysql,
    1.2 配置 mysql binlog使用ROW模式
    1.3 在MySQL添加对应的canal用户
    1.4 检查canal用户生效
  2. 下载canal并配置
    2.1 下载canal
    2.2 配置 canal
    2.3 启动canal (需要JDK>=1.6.25)

(1) 配置MySQL

(1.1) 安装MySQL

参考 https://dev.mysql.com/doc/refman/5.7/en/installing.html

(1.2) 修改MySQL配置文件

canal的原理是基于mysql binlog技术,所以需要开启mysql的binlog写入功能,并且配置binlog模式为row.

1
2
3
4
[mysqld]  
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction ,不能和 canal 的 slaveId 重复

(1.3) MySQL添加canal用户并授权

canal的原理是模拟自己为mysql slave,所以需要mysql slave的相关权限

1
2
3
4
5
CREATE USER canal IDENTIFIED BY 'canal';    

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

(1.4) 校验用户对应权限

  1. show master status ;
    如果正常显示binlog,则没问题,如果提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation ,则没有对应 REPLICATION CLIENT 权限
  1. show slave status ;
    如果提示 Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation ,则没有对应 REPLICATION SLAVE 权限

(2) 下载并启动canal

执行 ./bin/startup.sh 即可启动

(2.1) 下载canal

https://github.com/alibaba/canal/releases 选择合适的版本
下载 wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz

(2.2) 修改配置

修改 conf/example/instance.properties
以下只列出比较重要的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
## mysql serverId  不能重复
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = database_wkq
canal.instance.connectionCharset = UTF-8
#table regex 需要监控的表 通过,分隔 也可以使用正则 .*\\..*
canal.instance.filter.regex = table_wkq,table_2,table_3,
# table black regex
canal.instance.filter.black.regex =

(2.3) 启动canal

通过 sh bin/startup.sh 或者 ./bin/startup.sh 启动

启动后通过 jps -l 命令 可以看到 com.alibaba.otter.canal.deployer.CanalLauncher

canal启动时canal.log

canal.deployer-1.0.24/logs/canal/canal.log

1
2
3
4
5
6
7
8
9
10
2018-07-23 20:27:46.449 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-07-23 20:27:46.625 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.62.130:11111]
2018-07-23 20:27:47.576 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2018-07-23 20:27:47.721 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx1 successful.
2018-07-23 20:27:47.802 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx2 successful.
2018-07-23 20:27:47.862 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx3 successful.
2018-07-23 20:27:47.921 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx4 successful.
2018-07-23 20:27:47.987 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx5 successful.
2018-07-23 20:27:48.044 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx6 successful.
2018-07-23 20:27:48.094 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx7 successful.

(2.2) canal正常启动时instance对应的日志

canal.deployer-1.0.24/logs/example/example.log

1
2
3
4
5
6
7
2018-07-23 20:27:47.429 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-07-23 20:27:47.436 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
2018-07-23 20:27:47.444 [canal-instance-scan-0] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2018-07-23 20:27:47.451 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx
2018-07-23 20:27:47.453 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-07-23 20:27:47.666 [destination = xxx , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status


停止canal

sh stop.sh./bin/stop.sh

1
2
3
2018-07-23 21:45:08.241 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## stop the canal server
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[10.0.62.130:11111]
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## canal server is down.

(4) 程序中使用

以下代码仅作为示例

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* CanalTest
*
* @author: [email protected]
* @date: 2020-05-30 08:26
**/
@Slf4j
public class CanalTest {

/**
* @param args
*/
public static void main(String args[]) {

String canalHost = "127.0.0.1";
int canalPort = 11111;
String destination = "example";
InetSocketAddress address = new InetSocketAddress(canalHost, canalPort);

// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "", "");
// connector = CanalConnectors.newClusterConnector(addresses, destination, "", "");

int batchSize = 1000;
int emptyCount = 0;
try {
// 链接对应的canal server
connector.connect();
// 客户端订阅,不提交客户端filter,以服务端的filter为准
connector.subscribe();
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();

int totalEmptyCount = 12000000;

// 退出条件 一般是 while true
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

emptyCount++;
log.info("empty count : " + emptyCount);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.info("", e);
}

} else {
emptyCount = 0;
log.info("message[batchId={},size={}] ", batchId, size);

// 消费
consumeMsg(message.getEntries());
}

// 提交确认
connector.ack(batchId);
// 处理失败, 回滚数据
// connector.rollback(batchId);
}

log.info("empty too many times, exit");
} finally {
// 释放链接
connector.disconnect();
}
}

/**
* 消费消息
*
* @param entries
*/
private static void consumeMsg(List<CanalEntry.Entry> entries) {

// 这里只打印
printEntry(entries);
// TODO 其它操作

}

/**
* @param entrys
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}

CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}

CanalEntry.EventType eventType = rowChage.getEventType();
log.info(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType)
);

for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
log.info("------- before");
printColumn(rowData.getBeforeColumnsList());
log.info("------- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

/**
* @param columns
*/
private static void printColumn(List<CanalEntry.Column> columns) {
Map<String, String> map = new HashMap<>();
for (CanalEntry.Column column : columns) {
map.put(column.getName(), column.getValue());
//log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
log.info("{}", map);
}


}

(5) canal复制原理

复制如何工作,整体上来说,复制有3个步骤:
(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);
(2) slave将master的binary log events复制到它的中继日志(relay log)中;
(3) slave读取中继日志中的事件,将其重放到备库数据之上。

下图描述了复制的过程:
canal复制

(6) 遇到的问题

(6.1) Error When doing Client Authentication:ErrorPacket

1
2
3
4
5
6
7
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:208)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:71)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:56)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:157)
at java.lang.Thread.run(Thread.java:748)

原因 用户名密码不正确

(6.2) Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

1
2
3
4
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx
[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
with command: show master status

用canal账户登录后发现可以查看对应数据库对应表的数据,但是 show master status 提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

1、instance.properties配置文件里配置的用户没有REPLICATION权限
2、canal instance.properties 配置错误
3、配置文件里用户名密码不正确
4、MySQL对应用户不存在
5、MySQL配置不对

给canal用户对应的replication权限
grant replication client on *.* to 'canal'@'%';
flush privileges

(6.3) Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation

1
2
[destination = xxx , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx[java.io.IOException: Received error packet: errno = 1227, sqlstate = 42000 errmsg = Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:95)

Access denied 没权限 需要给对应账户授权
REPLICATION SLAVE 常用于建立复制时所需要用到的用户权限,也就是slave server必须被master server授权具有该权限的用户,才能通过该用户复制。
并且”SHOW SLAVE HOSTS”这条命令和REPLICATION SLAVE权限有关,否则执行时会报错:

REPLICATION CLIENT 不可用于建立复制,有该权限时,只是多了可以使用如”SHOW SLAVE STATUS”、”SHOW MASTER STATUS”等命令。
在5.6.6版本以后,也可以使用”SHOW BINARY LOGS”。

GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'
flush privileges

(6.4) canal用了UseConcMarkSweepGC不能用JDK14

1
2
3
4
5
6
7
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option UseConcMarkSweepGC; support was removed in 14.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option CMSParallelRemarkEnabled; support was removed in 14.0
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

修改 bin/start.sh 文件,修改对应的JAVA路径

1
2
3
4
5
## set java path
if [ -z "$JAVA" ] ; then
#JAVA=$(which java)
JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java"
fi

(6.5) com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused

1
2
3
Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)

canal没启动 或者 canal挂了
配置被删了,检查对应 destinationinstance.properties
instance.properties 没配置


(7) canal-admin后台管理

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
简单来说,canal-admin是一个后台维护系统,简化了配置canal的工作,提高了效率,终于不用到服务器上一个一个配了

访问地址 http://127.0.0.1:8089/

(7.1) canal-admin的核心模型主要有

instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维

References

[1] canal/wiki
[2] canal-AdminGuide
[3] ClientExample
[4] Canal-Admin-QuickStart
[5] Canal-Admin-Guide
[6] canal配置使用
[7] Mysql 普通账户授权replication client后登录失败问题
[8] REPLICATION SLAVE 与 REPLICATION CLIENT 权限
[9] 对replication slave,replication client的一点说明
[10] MySQL 5.6 Reference Manual – 6.2.1 Privileges Provided by MySQL
[11] SimpleCanalClientTest
[12] ClusterCanalClientTest