Maxwell安装使用
1、Maxwell简介
Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。读取 MySQL binlogs 并将修改行字段的更新写入 Kafka, Kinesis, RabbitMQ, Google Cloud Pub/Sub 或 Redis (Pub/Sub or LPUSH) 以作为 JSON 的应用程序。
官网:https://maxwells-daemon.io/
github:https://github.com/zendesk/maxwell
安装版本:maxwell-1.29.2
快速开始:https://maxwells-daemon.io/quickstart/
2、安装mysql开启binlog
创建数据库maxwell 用于存储 Maxwell 的元数据,
新增账号maxwell并授权:
1 |
|
创建测试数据库testdb,创建测试表testuser
1 |
|
3、启动maxwell
方法1:
1 |
|
方法2:
1 |
|
修改config.properties:
1 |
|
启动:
1 |
|
出现如下报错:
java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
at com.zendesk.maxwell.schema.columndef.StringColumnDef.charsetForCharset(StringColumnDef.java:61)
at com.zendesk.maxwell.schema.columndef.StringColumnDef.asJSON(StringColumnDef.java:75)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:112)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:162)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:176)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:486)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:626)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:178)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:255)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:183)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:286)
00:29:21,008 INFO TaskManager - Stopped all tasks
这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。
一种解决方案是,将MySQL降级,重新安装5.5.3以下的版本。
另一种方法则是修改maxwell源码。
解压打开,找到有问题的类:com.zendesk.maxwell.schema.columndef.StringColumnDef,加上能识别utf8mb3的语句,重新打包。
打包好的maxwell-1.19.0.jar百度云盘链接: https://pan.baidu.com/s/1t0s_e6d2G1-Z4dM3pSwVXQ?pwd=cs8m 提取码: cs8m 。
替换maxwell/lib/maxwell-1.19.0.jar ,重启即可。
[root@river106 maxwell-1.29.2]# bin/maxwell –config config.properties
Using kafka version: 1.0.0
20:48:10,306 INFO Maxwell - Starting Maxwell. maxMemory: 837287936 bufferMemoryUsage: 0.25
20:48:10,472 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000001:1772098], lastHeartbeat=1681476466529]
20:48:10,802 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[binlog.000001:987693], lastHeartbeat=1681465703072])
20:48:10,987 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[binlog.000001:931603], lastHeartbeat=0])
20:48:11,026 INFO MysqlSavedSchema - beginning to play deltas…
20:48:11,027 INFO MysqlSavedSchema - played 1 deltas in 1ms
20:48:11,067 INFO BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000001:1772098
20:48:11,099 INFO BinaryLogClient - Connected to localhost:3306 at binlog.000001/1772098 (sid:6379, cid:3825)
20:48:11,099 INFO BinlogConnectorReplicator - Binlog connected.
如上说明启动成功!
binlog变更抓取测试
分别新增数据、更新数据、删除数据,观察控制台输出变化
1 |
|
{“database”:”testdb”,”table”:”testuser”,”type”:”insert”,”ts”:1683336811,”xid”:126263,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”33333”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:33:31”}}
{“database”:”testdb”,”table”:”testuser”,”type”:”update”,”ts”:1683336841,”xid”:126348,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”55555”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:34:01”},”old”:{“pswd”:”33333”,”modify_time”:”2023-05-06 09:33:31”}}
{“database”:”testdb”,”table”:”testuser”,”type”:”delete”,”ts”:1683336847,”xid”:126373,”commit”:true,”data”:{“id”:12,”user_name”:”testuser”,”pswd”:”55555”,”create_time”:”2023-05-06 09:33:31”,”modify_time”:”2023-05-06 09:34:01”}}
4、输出到redis
1 |
|
或修改config.properties
1 |
|
1 |
|
相关配置文档:https://maxwells-daemon.io/producers/#redis
测试发布订阅模式
设置config.properties的redis_type=pubsub
mysql中新增一条数据,redis订阅频道maxwell
1 |
|
测试list模式
设置config.properties的redis_type=lpush
1 |
|
通过发布订阅模式,实现数据同步功能,通过list方式可以获取最新的数据的变化和数据变化数量等需求。
5、输出到RabbitMQ
RabbitMQ安装及使用请参考: RabbitMQ安装及简单使用
RabbitMQ中新建exchange:maxwell,类型为:fanout。
修改Maxwell配置config.properties
1 |
|
Maxwell相关配置文档:https://maxwells-daemon.io/producers/#rabbitmq
启动服务:
1 |
|
6、监控
通过 http 方式获取监控指标,修改config.properties配置如下:
1 |
|
启动服务:
1 |
|
打开浏览器,访问:http://ip:8080/metrics,即可获取到监控指标.
5、Maxwell与Canal 工具对比
1、Maxwell没有Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
2、Maxwell有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
3、Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续读取数据。
4、Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
5、Maxwell比Canal更加轻量级。