使用canal同步mysql数据库信息到RabbitMQ

1、canal简介

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

github:https://github.com/alibaba/canal/

gitee:https://gitee.com/mirrors/canal

2、工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

3、环境准备

3.1 首先安装好mysql及rabbitmq

mysql比较简单,这里不介绍,rabbitmq安装及使用可以参考:RabbitMQ安装及简单使用

3.2 mysql需开启binlog

查看是否开启binlog

1
SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

未开启的话可以修改/etc/my.cnf 开启binlog

1
2
3
4
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

配置好后重启mysql。

创建用于同步的mysql账号:

1
2
3
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3.3 rabbitmq配置

在virtualHost:/ 下新增Exchanges: canal.topic

新增队列:test.queue, 绑定canal.topic, RoutingKey:test.routingKey

3.4 canal下载及配置

https://github.com/alibaba/canal/releases/tag/canal-1.1.5

目前最新版本为1.1.5,支持将数据发送至rabbitmq。

目录如下:

修改conf目录下的canal.properties

1
2
3
4
5
6
7
8
canal.serverMode = rabbitMQ

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
# rabbitmq中新建的 Exchange
rabbitmq.exchange = canal.topic
rabbitmq.username = guest
rabbitmq.password = guest

修改conf/example下的instance.properties

1
2
3
4
5
6
7
8
canal.instance.master.address=127.0.0.1:3306

# mysql中配置的用于同步的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# rabbitmq中配置的 绑定的 routingkey
canal.mq.topic=test.routingKey

修改完成后,启动bin下的startup.bat(windows下,linux下启动startup.sh),查看logs/canal下canal.log, 如下内容说明启动成功:

    2021-07-10 18:30:18.088 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2021-07-10 18:30:18.147 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2021-07-10 18:30:18.320 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2021-07-10 18:30:18.899 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.104(192.168.0.104):11111]
    2021-07-10 18:30:20.723 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

至此,环境准备完成。

4、数据同步测试

4.1 新建数据库

新建测试数据库:canal

可以看到rabbitmq,test.queue队列中收到数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"data": null,
"database": "`canal`",
"es": 1625913686000,
"id": 6,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE DATABASE `canal` CHARACTER SET utf8",
"sqlType": null,
"table": "",
"ts": 1625913686928,
"type": "QUERY"
}

4.2 新增表

新建测试表:tb_user

1
2
3
4
5
6
7
CREATE TABLE `tb_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(30) NOT NULL DEFAULT '',
`address` varchar(200) DEFAULT NULL,
`email` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mq中收到数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"data": null,
"database": "canal",
"es": 1625914138000,
"id": 7,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE TABLE `tb_user` (\r\n`id` int(11) NOT NULL AUTO_INCREMENT ,\r\n`name` varchar(30) NOT NULL DEFAULT '' ,\r\n`address` varchar(200) NULL ,\r\n`email` varchar(100) NULL ,\r\nPRIMARY KEY (`id`)\r\n)",
"sqlType": null,
"table": "tb_user",
"ts": 1625914138420,
"type": "CREATE"
}

4.3 新增数据记录

在表中插入一条数据:

1
INSERT INTO `canal`.`tb_user` (`id`, `name`, `address`, `email`) VALUES ('1', 'jason', '安徽合肥', 'jason@qq.com');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914305000,
"id": 8,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914305197,
"type": "INSERT"
}

通过 “type”: “INSERT” 及data可以获取到新增的数据

4.4 更新数据

更新 jason 的 address 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥高新区",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914494000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": [
{
"address": "安徽合肥"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914494755,
"type": "UPDATE"
}

通过 “type”: “UPDATE”,data及old中的信息 可以获取到更新的数据

4.5 删除数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥高新区",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914783000,
"id": 10,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914783109,
"type": "DELETE"
}

通过 “type”: “DELETE”,data 就可以判断删除的数据

4.6 新增字段

新增remark字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"data": null,
"database": "canal",
"es": 1625914881000,
"id": 11,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `tb_user`\r\nADD COLUMN `remark` varchar(100) NULL AFTER `email`",
"sqlType": null,
"table": "tb_user",
"ts": 1625914881179,
"type": "ALTER"
}

通过 “type”: “ALTER” 及 sql即可知道 新增了remark列。

4.7 删除字段

删除email字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"data": null,
"database": "canal",
"es": 1625915072000,
"id": 12,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `tb_user`\r\nDROP COLUMN `email`",
"sqlType": null,
"table": "tb_user",
"ts": 1625915072617,
"type": "ALTER"
}

通过 “type”: “ALTER”,sql即可知道删除了email列。

5、总结

mysql中变更的数据都可以通过canal发送到rabbitmq,只要监听rabbitmq队列,就可以获取到变更的数据,进行业务处理,可以写入redis,刷新缓存,也可以同步到elasticsearch等等。


使用canal同步mysql数据库信息到RabbitMQ
https://river106.cn/posts/3d250031.html
作者
river106
发布于
2021年7月10日
许可协议