Maxwell安装使用

1、Maxwell简介

Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。读取 MySQL binlogs 并将修改行字段的更新写入 Kafka, Kinesis, RabbitMQ, Google Cloud Pub/Sub 或 Redis (Pub/Sub or LPUSH) 以作为 JSON 的应用程序。
官网:https://maxwells-daemon.io/
github:https://github.com/zendesk/maxwell

安装版本:maxwell-1.29.2
快速开始:https://maxwells-daemon.io/quickstart/

2、安装mysql开启binlog

创建数据库maxwell 用于存储 Maxwell 的元数据,
新增账号maxwell并授权:

1
2
3
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '12345678';
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@'%';
flush privileges;

创建测试数据库testdb,创建测试表testuser

1
2
3
4
5
6
7
8
CREATE TABLE `testuser` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`user_name` varchar(100) NOT NULL COMMENT '用户名',
`pswd` varchar(100) NOT NULL COMMENT '密码',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3、启动maxwell

方法1:

1
bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' --producer=stdout

方法2:

1
cp config.properties.example config.properties

修改config.properties:

1
2
3
producer=stdout
host=127.0.0.1
password=12345678

启动:

1
bin/maxwell --config config.properties

出现如下报错:

java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
at com.zendesk.maxwell.schema.columndef.StringColumnDef.charsetForCharset(StringColumnDef.java:61)
at com.zendesk.maxwell.schema.columndef.StringColumnDef.asJSON(StringColumnDef.java:75)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:112)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:162)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:176)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:486)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:626)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:178)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:255)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:183)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:286)
00:29:21,008 INFO TaskManager - Stopped all tasks

这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。
一种解决方案是,将MySQL降级,重新安装5.5.3以下的版本。
另一种方法则是修改maxwell源码。
解压打开,找到有问题的类:com.zendesk.maxwell.schema.columndef.StringColumnDef,加上能识别utf8mb3的语句,重新打包。
打包好的maxwell-1.19.0.jar百度云盘链接: https://pan.baidu.com/s/1t0s_e6d2G1-Z4dM3pSwVXQ?pwd=cs8m 提取码: cs8m 。
替换maxwell/lib/maxwell-1.19.0.jar ,重启即可。

[root@river106 maxwell-1.29.2]# bin/maxwell –config config.properties
Using kafka version: 1.0.0
20:48:10,306 INFO Maxwell - Starting Maxwell. maxMemory: 837287936 bufferMemoryUsage: 0.25
20:48:10,472 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000001:1772098], lastHeartbeat=1681476466529]
20:48:10,802 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[binlog.000001:987693], lastHeartbeat=1681465703072])
20:48:10,987 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[binlog.000001:931603], lastHeartbeat=0])
20:48:11,026 INFO MysqlSavedSchema - beginning to play deltas…
20:48:11,027 INFO MysqlSavedSchema - played 1 deltas in 1ms
20:48:11,067 INFO BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000001:1772098
20:48:11,099 INFO BinaryLogClient - Connected to localhost:3306 at binlog.000001/1772098 (sid:6379, cid:3825)
20:48:11,099 INFO BinlogConnectorReplicator - Binlog connected.

如上说明启动成功!

binlog变更抓取测试
分别新增数据、更新数据、删除数据,观察控制台输出变化

1
2
3
4
INSERT INTO testdb.testuser (user_name,pswd,create_time,modify_time) VALUES
('testuser','33333',now(),now());
UPDATE testdb.testuser SET pswd='55555' WHERE user_name = 'testuser';
DELETE FROM testdb.testuser WHERE user_name = 'testuser';

{“database”:”testdb”,”table”:”testuser”,”type”:”insert”,”ts”:1683336811,”xid”:126263,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”33333”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:33:31”}}
{“database”:”testdb”,”table”:”testuser”,”type”:”update”,”ts”:1683336841,”xid”:126348,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”55555”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:34:01”},”old”:{“pswd”:”33333”,”modify_time”:”2023-05-06 09:33:31”}}
{“database”:”testdb”,”table”:”testuser”,”type”:”delete”,”ts”:1683336847,”xid”:126373,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”55555”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:34:01”}}

4、输出到redis

1
2
bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' \
--producer=redis --redis_host='127.0.0.1' --redis_port=6380 --redis_auth='123456Ab'

或修改config.properties

1
2
3
4
5
6
redis_host=127.0.0.1
redis_port=6380
redis_auth=123456Ab
redis_database=0
redis_key=maxwell
redis_type=pubsub
1
bin/maxwell --config config.properties

相关配置文档:https://maxwells-daemon.io/producers/#redis

测试发布订阅模式

设置config.properties的redis_type=pubsub
mysql中新增一条数据,redis订阅频道maxwell

1
2
3
4
5
6
7
8
9
127.0.0.1:6380> SUBSCRIBE maxwell
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "maxwell"
3) (integer) 1

1) "message"
2) "maxwell"
3) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566124,\"xid\":86540,\"commit\":true,\"data\":{\"id\":8,\"user_name\":\"sssss\",\"pswd\":\"123456\",\"create_time\":\"2023-04-14 23:32:35\",\"modify_time\":\"2023-04-15 21:29:38\"}}"

测试list模式

设置config.properties的redis_type=lpush

1
2
3
4
5
6
7
8
127.0.0.1:6380> keys *
1) "foo"
2) "maxwell"
127.0.0.1:6380> LRANGE maxwell 0 10
1) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380> LPOP maxwell
"{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380>

通过发布订阅模式,实现数据同步功能,通过list方式可以获取最新的数据的变化和数据变化数量等需求。

5、输出到RabbitMQ

RabbitMQ安装及使用请参考: RabbitMQ安装及简单使用
RabbitMQ中新建exchange:maxwell,类型为:fanout。
修改Maxwell配置config.properties

1
2
3
4
5
6
7
8
9
producer=rabbitmq
rabbitmq_host=127.0.0.1
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=fanout
rabbitmq_exchange_durable=true

Maxwell相关配置文档:https://maxwells-daemon.io/producers/#rabbitmq
启动服务:

1
bin/maxwell --config config.properties

6、监控

通过 http 方式获取监控指标,修改config.properties配置如下:

1
2
3
metrics_type=http
metrics_jvm=true
http_port=8080

启动服务:

1
bin/maxwell --config config.properties

打开浏览器,访问:http://ip:8080/metrics,即可获取到监控指标.

5、Maxwell与Canal 工具对比

1、Maxwell没有Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
2、Maxwell有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
3、Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续读取数据。
4、Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
5、Maxwell比Canal更加轻量级。

扩展:
https://toutiao.io/posts/0xfdws/preview


Maxwell安装使用
https://river106.cn/posts/1c602f29.html
作者
river106
发布于
2023年5月5日
许可协议