1、canal简介 canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
github:https://github.com/alibaba/canal/
gitee:https://gitee.com/mirrors/canal
2、工作原理 MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
3、环境准备 3.1 首先安装好mysql及rabbitmq mysql比较简单,这里不介绍,rabbitmq安装及使用可以参考:RabbitMQ安装及简单使用 。
3.2 mysql需开启binlog 查看是否开启binlog
1 SHOW VARIABLES LIKE '%log_bin%'
如果log_bin的值为OFF是未开启,为ON是已开启。
未开启的话可以修改/etc/my.cnf 开启binlog
1 2 3 4 [mysqld] log-bin =mysql-binbinlog-format =ROWserver_id =1
配置好后重启mysql。
创建用于同步的mysql账号:
1 2 3 create user canal@'%' IDENTIFIED by 'canal' ;GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON * .* TO 'canal' @'%' ; FLUSH PRIVILEGES;
3.3 rabbitmq配置 在virtualHost:/ 下新增Exchanges: canal.topic
新增队列:test.queue, 绑定canal.topic, RoutingKey:test.routingKey
3.4 canal下载及配置 https://github.com/alibaba/canal/releases/tag/canal-1.1.5
目前最新版本为1.1.5,支持将数据发送至rabbitmq。
目录如下:
修改conf目录下的canal.properties
1 2 3 4 5 6 7 8 canal.serverMode = rabbitMQ rabbitmq.host = 127.0.0.1 rabbitmq.virtual.host = / rabbitmq.exchange = canal.topic rabbitmq.username = guest rabbitmq.password = guest
修改conf/example下的instance.properties
1 2 3 4 5 6 7 8 canal.instance.master.address =127.0.0.1:3306 canal.instance.dbUsername =canal canal.instance.dbPassword =canal canal.mq.topic =test.routingKey
修改完成后,启动bin下的startup.bat(windows下,linux下启动startup.sh),查看logs/canal下canal.log, 如下内容说明启动成功:
2021-07-10 18:30:18.088 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2021-07-10 18:30:18.147 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2021-07-10 18:30:18.320 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2021-07-10 18:30:18.899 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.104(192.168.0.104):11111]
2021-07-10 18:30:20.723 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
至此,环境准备完成。
4、数据同步测试 4.1 新建数据库 新建测试数据库:canal
可以看到rabbitmq,test.queue队列中收到数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "data" : null , "database" : "`canal`" , "es" : 1625913686000 , "id" : 6 , "isDdl" : true , "mysqlType" : null , "old" : null , "pkNames" : null , "sql" : "CREATE DATABASE `canal` CHARACTER SET utf8" , "sqlType" : null , "table" : "" , "ts" : 1625913686928 , "type" : "QUERY" }
4.2 新增表 新建测试表:tb_user
1 2 3 4 5 6 7 CREATE TABLE `tb_user` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `name` varchar (30 ) NOT NULL DEFAULT '' , `address` varchar (200 ) DEFAULT NULL , `email` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8;
mq中收到数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "data" : null , "database" : "canal" , "es" : 1625914138000 , "id" : 7 , "isDdl" : true , "mysqlType" : null , "old" : null , "pkNames" : null , "sql" : "CREATE TABLE `tb_user` (\r\n`id` int(11) NOT NULL AUTO_INCREMENT ,\r\n`name` varchar(30) NOT NULL DEFAULT '' ,\r\n`address` varchar(200) NULL ,\r\n`email` varchar(100) NULL ,\r\nPRIMARY KEY (`id`)\r\n)" , "sqlType" : null , "table" : "tb_user" , "ts" : 1625914138420 , "type" : "CREATE" }
4.3 新增数据记录 在表中插入一条数据:
1 INSERT INTO `canal`.`tb_user` (`id`, `name`, `address`, `email`) VALUES ('1' , 'jason' , '安徽合肥' , 'jason@qq.com' );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 { "data" : [ { "id" : "1" , "name" : "jason" , "address" : "安徽合肥" , "email" : "jason@qq.com" } ] , "database" : "canal" , "es" : 1625914305000 , "id" : 8 , "isDdl" : false , "mysqlType" : { "id" : "int(11)" , "name" : "varchar(30)" , "address" : "varchar(200)" , "email" : "varchar(100)" } , "old" : null , "pkNames" : [ "id" ] , "sql" : "" , "sqlType" : { "id" : 4 , "name" : 12 , "address" : 12 , "email" : 12 } , "table" : "tb_user" , "ts" : 1625914305197 , "type" : "INSERT" }
通过 “type”: “INSERT” 及data可以获取到新增的数据
4.4 更新数据 更新 jason 的 address 信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 { "data" : [ { "id" : "1" , "name" : "jason" , "address" : "安徽合肥高新区" , "email" : "jason@qq.com" } ] , "database" : "canal" , "es" : 1625914494000 , "id" : 9 , "isDdl" : false , "mysqlType" : { "id" : "int(11)" , "name" : "varchar(30)" , "address" : "varchar(200)" , "email" : "varchar(100)" } , "old" : [ { "address" : "安徽合肥" } ] , "pkNames" : [ "id" ] , "sql" : "" , "sqlType" : { "id" : 4 , "name" : 12 , "address" : 12 , "email" : 12 } , "table" : "tb_user" , "ts" : 1625914494755 , "type" : "UPDATE" }
通过 “type”: “UPDATE”,data及old中的信息 可以获取到更新的数据
4.5 删除数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 { "data" : [ { "id" : "1" , "name" : "jason" , "address" : "安徽合肥高新区" , "email" : "jason@qq.com" } ] , "database" : "canal" , "es" : 1625914783000 , "id" : 10 , "isDdl" : false , "mysqlType" : { "id" : "int(11)" , "name" : "varchar(30)" , "address" : "varchar(200)" , "email" : "varchar(100)" } , "old" : null , "pkNames" : [ "id" ] , "sql" : "" , "sqlType" : { "id" : 4 , "name" : 12 , "address" : 12 , "email" : 12 } , "table" : "tb_user" , "ts" : 1625914783109 , "type" : "DELETE" }
通过 “type”: “DELETE”,data 就可以判断删除的数据
4.6 新增字段 新增remark字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "data" : null , "database" : "canal" , "es" : 1625914881000 , "id" : 11 , "isDdl" : true , "mysqlType" : null , "old" : null , "pkNames" : null , "sql" : "ALTER TABLE `tb_user`\r\nADD COLUMN `remark` varchar(100) NULL AFTER `email`" , "sqlType" : null , "table" : "tb_user" , "ts" : 1625914881179 , "type" : "ALTER" }
通过 “type”: “ALTER” 及 sql即可知道 新增了remark列。
4.7 删除字段 删除email字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "data" : null , "database" : "canal" , "es" : 1625915072000 , "id" : 12 , "isDdl" : true , "mysqlType" : null , "old" : null , "pkNames" : null , "sql" : "ALTER TABLE `tb_user`\r\nDROP COLUMN `email`" , "sqlType" : null , "table" : "tb_user" , "ts" : 1625915072617 , "type" : "ALTER" }
通过 “type”: “ALTER”,sql即可知道删除了email列。
5、总结 mysql中变更的数据都可以通过canal发送到rabbitmq,只要监听rabbitmq队列,就可以获取到变更的数据,进行业务处理,可以写入redis,刷新缓存,也可以同步到elasticsearch等等。