maxwell同步mysql到kafka(一个服务器启动多个)

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-start",
    "ts": 1694748250,
    "data": {}
}

{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 1,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 1,
    "industry_name": "工程建设",
    "superior_industry_id": null
    }
} {
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 2,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 1,
    "industry_name": "轻工",
    "superior_industry_id": null
    }
} {
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 3,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 2,
    "industry_name": "土木",
    "superior_industry_id": 1
    }
}
......

{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-complete",
    "ts": 1694748250,
    "data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-start",
    "ts": 1694748250,
    "data": {}
}

{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 1,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 1,
    "industry_name": "工程建设",
    "superior_industry_id": null
    }
} {
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 2,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 1,
    "industry_name": "轻工",
    "superior_industry_id": null
    }
} {
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
    "id": 3,
    "create_time": "2022-08-19 00:00:00.000000",
    "update_time": "2022-08-19 00:00:00.000000",
    "industry_level": 2,
    "industry_name": "土木",
    "superior_industry_id": 1
    }
}
......

{
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-complete",
    "ts": 1694748250,
    "data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-06 07:36:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-06 07:36:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-06 07:36:04       87 阅读
  4. Python语言-面向对象

    2024-06-06 07:36:04       96 阅读

热门阅读

  1. TCP/IP 接收发送缓存大小的自动调优 Auto Tuning

    2024-06-06 07:36:04       28 阅读
  2. 《解锁创意无限:Stable Diffusion 详细教程指南》

    2024-06-06 07:36:04       28 阅读
  3. 洛谷 P8740 [蓝桥杯 2021 省 A] 填空问题 题解

    2024-06-06 07:36:04       27 阅读
  4. folyd算法求最短路径

    2024-06-06 07:36:04       43 阅读
  5. AudioSet 本体与声音实体对象

    2024-06-06 07:36:04       29 阅读
  6. Android AAudio——C API创建AudioTrack(六)

    2024-06-06 07:36:04       29 阅读
  7. 力扣2831.找出最长等值子数组

    2024-06-06 07:36:04       30 阅读
  8. axios

    axios

    2024-06-06 07:36:04      29 阅读
  9. iOS中的UIScene和UISceneDelegate

    2024-06-06 07:36:04       28 阅读
  10. Android AAudio——音频流释放死锁(七)

    2024-06-06 07:36:04       31 阅读
  11. Python中的上下文管理:深入探索contextlib模块

    2024-06-06 07:36:04       27 阅读