Maxwell
1. Maxwell概述
1.1 Maxwell定义
Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。 实时读取 MySQL 二进制日志 Binlog
,并生成 JSON 格式的消息
,作为生产者发送给 Kafka
,Kinesis、 RabbitMQ、Redis
、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网:Maxwell’s Daemon (maxwells-daemon.io)
1.2 Maxwell 工作原理
1.2.1 MySQL主从复制过程
过程:
- Master 主库将改变记录,写到二进制日志(binary log)中
- Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝 到它的中继日志(relay log);
- Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
1.2.2 Maxwell 的工作原理
Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave
,然后以 slave 的身份假装从 MySQL(master)复制数据。
1.2.3 MySQL 的 binlog
(1) 什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除 了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进 制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
- 其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递 给 slaves 来达到 master-slave 数据一致的目的。
- 其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件
(2) binlog 的开启
MySQL8.0以前没有开启。生产建议全部开启
配置方法:
[root@mysql01 ~]# vim /etc/my.cnf
[mysqld]
server_id=3 #前面设置了1,这个可以不设置
log_bin=/data/binlog/mysql-bin #binlog目录
binlog_format=row
sync_binlog=1
expire_logs_days=30
max_binlog_size=100M
[mysqld]
server_id=3 #(5.6可以不设置,5.7必须,是主从中应用)
log_bin=/data/binlog/mysql-bin #(日志位置和日志名前缀) 非常重要,差重选择存储目录
sync_binlog=1 #(binlog日志刷盘的策略,每次事务提交立刻刷写到磁盘)
binlog_format=row #binlog的日志记录格式为row模式(row模式会产生大量日志)(row相似debug)
expire_logs_days=30 #设置binlog日志30天过期删除
max_binlog_size=100M #设置binlog日志大小为100M (单个二进制日志 大小上限 默认1G ,到达自动切割日志)
其中可以前往官网查看5.7的文档配置选项
重启前创建binlog目录
[root@mysql01 ~]# mkdir -p /data/binlog/
[root@mysql01 ~]# chown -R mysql.mysql /data/binlog/ #一定要给权限
[root@mysql01 ~]# systemctl restart mysqld
[root@mysql01 ~]# systemctl status mysqld
[root@mysql01 ~]# systemctl status mysqld
● mysqld.service - LSB: start and stop MySQL
Loaded: loaded (/etc/rc.d/init.d/mysqld; bad; vendor preset: disabled)
Active: active (running) since 六 2022-03-05 22:48:20 CST; 44s ago
......
略…..(MySQL知识自行复习以往笔记):
2. Maxwell 使用
注意
: 要提前安装好 kafka 和 MySQL
2.1 安装地址
(1)Maxwell 官网地址:http://maxwells-daemon.io/
(2)文档查看地址:http://maxwells-daemon.io/quickstart
2.2 MySQL 环境准备
(1) 修改 mysql 的配置文件,
开启 MySQL Binlog
设置
[root@master ~]# vi /etc/my.cnf
添加如下内容:
修改后如下:
[root@master ~]# grep -Ev "^$|^#" /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
server_id=1
log-bin=mysql-bin
binlog_format=row
重启MySQL服务
[root@master ~]# systemctl restart mysqld
登录 mysql 并查看是否修改完成
mysql> show variables like '%binlog%';
+--------------------------------------------+----------------------+
| Variable_name | Value |
+--------------------------------------------+----------------------+
| binlog_cache_size | 32768 |
| binlog_checksum | CRC32 |
| binlog_direct_non_transactional_updates | OFF |
| binlog_error_action | ABORT_SERVER |
| binlog_format | ROW |
| binlog_group_commit_sync_delay | 0 |
| binlog_group_commit_sync_no_delay_count | 0 |
| binlog_gtid_simple_recovery | ON |
| binlog_max_flush_queue_time | 0 |
| binlog_order_commits | ON |
| binlog_row_image | FULL |
| binlog_rows_query_log_events | OFF |
| binlog_stmt_cache_size | 32768 |
| binlog_transaction_dependency_history_size | 25000 |
| binlog_transaction_dependency_tracking | COMMIT_ORDER |
| innodb_api_enable_binlog | OFF |
| innodb_locks_unsafe_for_binlog | OFF |
| log_statements_unsafe_for_binlog | ON |
| max_binlog_cache_size | 18446744073709547520 |
| max_binlog_size | 1073741824 |
| max_binlog_stmt_cache_size | 18446744073709547520 |
| sync_binlog | 1 |
+--------------------------------------------+----------------------+
22 rows in set (0.00 sec)
如图:
(2) 进入/var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件
[root@master ~]# cd /var/lib/mysql
[root@master mysql]# ll
total 188460
-rw-r----- 1 mysql mysql 56 Jan 11 10:14 auto.cnf
-rw-r----- 1 mysql mysql 492 Jan 15 08:04 ib_buffer_pool
-rw-r----- 1 mysql mysql 50331648 Jan 15 08:04 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 Jan 11 10:14 ib_logfile1
-rw-r----- 1 mysql mysql 79691776 Jan 15 08:04 ibdata1
-rw-r----- 1 mysql mysql 12582912 Jan 15 08:04 ibtmp1
drwxr-x--- 2 mysql mysql 4096 Jan 13 06:38 metastore
drwxr-x--- 2 mysql mysql 4096 Jan 11 10:14 mysql
-rw-r----- 1 mysql mysql 154 Jan 15 08:04 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 Jan 15 08:04 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 Jan 15 08:04 mysql.sock
-rw------- 1 mysql mysql 5 Jan 15 08:04 mysql.sock.lock
drwxr-x--- 2 mysql mysql 4096 Jan 11 10:14 performance_schema
drwxr-x--- 2 mysql mysql 12288 Jan 11 10:14 sys
如图:
注:MySQL 生成的 binlog 文件初始大小一定是 154 字节
,然后前缀是 log-bin 参数配 置的,后缀是默认从.000001,然后依次递增。除了 binlog 文件文件以外,MySQL 还会额外 生产一个.index 索引文件用来记录当前使用的 binlog 文件。
在数据库查看有几个binlog日志:
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000001 | 154 |
+------------------+-----------+
1 row in set (0.00 sec)
查看当前在用的日志文件:
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 154 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
查看二进制事件的功能:
mysql> show binlog events in 'mysql-bin.000001';
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
| mysql-bin.000001 | 4 | Format_desc | 1 | 123 | Server ver: 5.7.25-log, Binlog ver: 4 |
| mysql-bin.000001 | 123 | Previous_gtids | 1 | 154 | |
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
2 rows in set (0.00 sec)
详情笔记看:
2.3 解压安装并重命名Maxwell
[root@master ~]# tar -zxvf /opt/software/maxwell-1.29.0.tar.gz -C /opt/module/
[root@master ~]# mv /opt/module/maxwell-1.29.0/ /opt/module/maxwell
2.4 添加环境变量
[root@master ~]# vi /etc/profile
添加以下内容:
#maxwell
export MAXWELL_HOME=/opt/module/maxwell
export PATH=$PATH:$MAXWELL_HOME/bin
2.5 初始化 Maxwell 元数据库
(1)在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
[root@master ~]# mysql -uroot -p123456
mysql> CREATE DATABASE maxwell;
Query OK, 1 row affected (0.00 sec)
(2)分配一个账号可以操作该数据库
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
Query OK, 0 rows affected, 1 warning (0.01 sec)
(4) 分配这个账号可以监控其他数据库的权限
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
(5)刷新 mysql 表权限
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
2.6 Maxwell 进程启动
Maxwell 进程启动方式有如下两种:
(1)使用命令行参数启动 Maxwell 进程
maxwell --user='maxwell' --password='123456' --host='master' --producer=stdout
maxwell --user='root' --password='123456' --host='master' --producer=stdout
参数解释:
参数 | 含义 |
---|---|
–user | 连接 mysql 的用户 |
–password | 连接 mysql 的用户的密码 |
–host | 安装的主机名 |
–producer | 生产者模式(stdout:控制台 kafka:kafka 集群) |
(2)修改配置文件,定制化启动 Maxwell 进程
拷贝一份原配置文档:
[root@master ~]# cd $MAXWELL_HOME/
[root@master maxwell-1.29.0]# cp config.properties.example config.properties
修改如下:
#默认就好
log_level=info
producer=kafka
# kafka的ip以及端口配置,修改成自己的,这里改成了集群模式,也可使用默认的单节点模式
kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092
#mysql登录信息
host=localhost
user=maxwell
password=123456
port=3306
#kafka的主题配置
kafka_topic=maxwell
配置选项 | 配置说明 |
---|---|
producer=kafka | 实时监控到的日志发送到kafka生产者 |
kafka.bootstrap.servers=localhost:9092 | 配置的本地kafka节点IP地址以及端口号,kafka默认端口号9092 |
host=xx.xx.xx.xx | 本地Mysql安装节点IP地址 |
user=******* | 用户名 |
password=****** | 用户密码(按照自己设置输入) |
kafka_topic=kafka | 实时监控到的日志数据发送kafkaTopic下(默认发送maxwell) |
include_dbs=test | 过滤除test数据库下所有日志数据 |
include_tables=表名,表名 | 可以指定不需要过滤日志数据的表名 |
修改结果:
2.7 启动测试
启动kafka集群 (maxwell运行需要先启动kafka)
先启动zookeeper服务:
方式1:启动zookeeper(kafka自带)
[root@master kafka]# zookeeper-server-start.sh -daemon config/zookeeper.properties
方式2:启动zookeeper集群(推荐)
#所有节点启动zookeeper服务
[root@master ~]# zkServer.sh start
#启动kafka
[root@master kafka]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
所有节点启动kafka服务:
[root@master ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
查看进程是否启动成功:
使用kakfa创建一个topic
创建的topic名称要与config.properties配置的一致
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic maxwell
查看创建的topic:
[root@master ~]# kafka-topics.sh --list --zookeeper localhost:2181 | grep maxwell
maxwell
启动maxwell测试
启动命令:
[root@master ~]# maxwell -conf $MAXWELL_HOME/config.properties
启动成功:
3. Maxwell入门案例
3.1 监控 Mysql 数据并在控制台打印
(1) 运行Maxwell来监控MySQL数据更新
maxwell --user='maxwell' --password='123456' --host='master' --producer=stdout
(2) 向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出
在MySQL创建一个 test_maxwell的数据库:
CREATE DATABASE IF NOT EXISTS test_maxwell;
创建表:
use test_maxwell;
CREATE TABLE IF NOT EXISTS test (
id INT PRIMARY KEY AUTO_INCREMENT,
field1 VARCHAR(255),
field2 INT,
field3 FLOAT
);
插入一条数据:
INSERT INTO test (field1, field2, field3) VALUES ('value1', 123, 45.67);
查看 maxwell 的控制台输出:
{
"database": "test_maxwell", --库名
"table": "test", --表名
"type": "insert", --数据更新类型
"ts": 1705321149, --操作时间
"xid": 7202, --操作 id
"commit": true, --提交成功
"data": { --数据
"id": 1,
"field1": "value1",
"field2": 123,
"field3": 45.67
}
}
结果如图:
(3) 向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台出现了 3 条 json 日志,说明 maxwell 是以数据行为单位进行日志的采集的。
插入数据:
INSERT INTO test (field1, field2, field3) VALUES
('value2', 456, 89.12),
('value4', 789, 12.34),
('value5', 900, 13.34);
插入数据后出现3条json日志,说明 maxwell 是以数据行为单位进行日志的采集的:
(4)修改 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
update test set field3='6666' where id = 1
Maxwell控制台输出:
{
"database": "test_maxwell",
"table": "test",
"type": "update", --更新数据
"ts": 1705325669,
"xid": 18689,
"commit": true,
"data": {
"id": 1,
"field1": "value1",
"field2": 123,
"field3": 6666.0 --修改后的数据
},
"old": {
"field3": 45.67 --修改前的数据
}
}
(5)删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
delete from test where id = 1;
Maxwell控制台输出:
{
"database": "test_maxwell",
"table": "test",
"type": "delete", --删除数据
"ts": 1705326034,
"xid": 19617,
"commit": true,
"data": {
"id": 1,
"field1": "value1",
"field2": 123,
"field3": 6666.0
}
}
3.2 监控 Mysql 数据输出到 kafka
(1) 实现步骤
:one:启动zookeeper 和 kafka服务
#所有节点启动zookeeper服务
[root@master ~]# zkServer.sh start
#启动kafka
[root@master kafka]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
结果:
:two:启动 Maxwell 监控 binlog
maxwell --user='maxwell' --password='123456' --host='master' --producer=kafka --kafka.bootstrap.servers=master:9092 --kafka_topic=maxwell
参数详解:
--user='maxwell'
: 指定连接 MySQL 数据库时使用的用户名,这里设置为 “maxwell”。--password='123456'
: 指定连接 MySQL 数据库时使用的密码,这里设置为 “123456”。请注意,直接在命令行中明文指定密码可能存在安全风险,最好是在安全环境下使用。--host='master'
: 指定连接 MySQL 数据库的主机名或 IP 地址,这里设置为 “master”。--producer=kafka
: 指定 Maxwell 数据库复制工具使用的消息队列类型,这里设置为 Kafka。--kafka.bootstrap.servers=master:9092
: 指定连接 Kafka 服务的地址,这里设置为 “master:9092”,表示 Kafka 服务运行在主机 “master” 上,端口为 “9092”。--kafka_topic=maxwell
: 指定 Maxwell 数据写入 Kafka 时使用的主题名称,这里设置为 “maxwell”。
这个命令的作用是使用 Maxwell 工具连接到 MySQL 数据库(使用指定的用户名、密码、主机),并将数据库变更的消息写入 Kafka 中的指定主题。
:three:打开 kafka 的控制台的消费者消费 maxwell 主题
[root@master ~]# kafka-console-consumer.sh --bootstrap-server master:9092 --topic maxwell
:four:向 test_maxwell 库的 test 表再次插入一条数据
INSERT INTO test (field1, field2, field3) VALUES ('value1', 123, 45.67);
:five:通过 kafka 消费者来查看到了数据,说明数据成功传入 kafka
{"database":"test_maxwell","table":"test","type":"insert","ts":1705326915,"xid":20783,"commit":true,"data":{"id":5,"field1":"value1","field2":123,"field3":45.67}}
(2) kafka 主题数据的分区控制
在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这 些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。 那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:
:one:修改 maxwell 的配置文件,定制化启动 maxwell 进程
备份之前修改的配置文件:
[root@master ~]# cp $MAXWELL_HOME/config.properties $MAXWELL_HOME/config.properties.bak
再对配置文件进行修改:
[root@master ~]# vim $MAXWELL_HOME/config.properties
添加此项配置并修改topic为maxwell3
:
kafka_topic=maxwell3
producer_partition_by=database
参数解释:
producer_partition_by=database
是 Maxwell 配置中的一个参数,用于指定在将数据库变更写入 Kafka 主题时如何进行分区。在这里,producer_partition_by
设置为 “database”,表示按照数据库进行分区。
完整配置如下:
[root@master ~]# grep -Ev "^$|^#" $MAXWELL_HOME/config.properties
log_level=info
producer=kafka
kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092
host=localhost
user=maxwell
password=123456
port=3306
kafka_topic=maxwell3
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
producer_partition_by=database
:two:手动创建一个 3 个分区的 topic,名字就叫做 maxwell3
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 2 --partitions 3 --topic maxwell3
查看创建的topic:
[root@master ~]# kafka-topics.sh --list --zookeeper master:2181 | grep maxwell3
maxwell3
:three:利用配置文件启动Maxwell进程
[root@master ~]# maxwell --config $MAXWELL_HOME/config.properties
启动成功截图:
:four:向 test_maxwell 库的 test 表再次插入一条数据
INSERT INTO test (field1, field2, field3) VALUES ('value6', 666, 66.66);
:two:通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 1 号分区
:six:向 test 库的 aaa 表插入一条数据
创建aaa表:
use test_maxwell;
create table aaa (id int, name varchar(255) not null ,old int not null )
向aaa表插入数据:
INSERT INTO aaa (id, name, old) VALUES (1,"hj",18);
INSERT INTO aaa (id, name, old) VALUES (2,"ggg",28);
可以看出数据全部进入了maxwell3 主题的 1号分区
3.3 监控 Mysql 指定表数据输出控制台
(1) 运行 maxwell 来监控 mysql 指定表数据更新
maxwell --user='maxwell' --password='123456' --host='master' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout
参数详解:
--user='maxwell'
: 指定连接 MySQL 数据库时使用的用户名,这里设置为 “maxwell”。--password='123456'
: 指定连接 MySQL 数据库时使用的密码,这里设置为 “123456”。请注意,直接在命令行中明文指定密码可能存在安全风险,最好是在安全环境下使用。--host='master'
: 指定连接 MySQL 数据库的主机名或 IP 地址,这里设置为 “master”。--filter 'exclude: *.*, include:test_maxwell.test'
: 用于配置 Maxwell 数据库复制工具的过滤规则。在这里,设置了两个规则:exclude: *.*
: 排除所有的数据库表。include:test_maxwell.test
: 包含名为 “test_maxwell” 数据库中的 “test” 表。
这样的过滤规则指定了哪些数据库和表的变更会被捕获和传输,以及哪些会被排除。
--producer=stdout
: 指定 Maxwell 数据库复制工具的消息生产者(用于发送消息的模块),这里设置为 “stdout”,表示将变更事件输出到标准输出。
(2) 向 test_maxwell.test 表插入一条数据,查看 maxwell 的监控
INSERT INTO test_maxwell.test (field1, field2, field3) VALUES ('value10', 989, 18.88);
查看maxwll监控:
{"database":"test_maxwell","table":"test","type":"insert","ts":1705376235,"xid":38715,"commit":true,"data":{"id":9,"field1":"value10","field2":989,"field3":18.88}}
(3) 向 test_maxwell.aaa 表插入一条数据,查看 maxwell 的监控
INSERT INTO test_maxwell.aaa (id, name, old) VALUES (3,"gxjzy",289);
查看maxwell监控:
本次maxwell控制台没有收到任何信息 说明 include
参数生效,只能监控指定的 mysql 表的信息
还可以设置 include:test_maxwell.*
,通过此种方式来监控 mysql 某个库的所有 表
,也就是说过滤整个库。可以自行测试,同理
3.4 监控 Mysql 指定表全量数据输出控制台,数据初始化
Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增及变化的数据,但是 Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据,来对 MySQL 的某张表 进行数据初始化,也就是我们常说的全量同步。具体操作步骤如下:
需求:将 test_maxwell 库下的 aaa表的3条数据,全量导入到 maxwell 控制台 进行打印。
(1) 修改 Maxwell 的元数据,触发数据初始化机制,
在 mysql 的 maxwell 库中 bootstrap 表中插入一条数据
,写明
需要全量数据的库名和表名
。
insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','aaa');
(2) 启动 maxwell 进程,此时初始化程序会直接打印
aaa 表
的所有数据
maxwell --user='maxwell' --password='123456' --host='master' --producer=stdout
(3) 当数据全部初始化完成以后,Maxwell 的元数据会变化
- is_complete 字段从 0 变为 1
- start_at 字段从 null 变为具体时间(数据同步开始时间)
- complete_at 字段从 null 变为具体时间(数据同步结束时间)
完成