说明
本文档适用于使用ApacheStream + Flink + Flink CDC Connectors来实现实时流式计算和同步
Flink Mysql CDC 配置
-- 配置flink checkpoing间隔时间,显式启动checkpoing,不加此配置不能实现同时读取全量数据和增量(binlog)数据。
SET 'execution.checkpointing.interval' = '3s';
drop table if exists source; -- 删除flink表(不是真实的表)
create table source( -- 创建字段
`ID` int primary key,
`name` varchar(32),
`CREATE_TIME` TIMESTAMP(3),
`UPDATE_TIME` TIMESTAMP(3)
)
with( -- 配置连接参数
'connector' = 'mysql-cdc',
'hostname' = '${hostname}',
'port' = '3306',
'username' = '${username}',
'password' = '${password}',
'database-name' = 'test_db',
'table-name' = 'test',
'server-time-zone' = 'Asia/Shanghai' -- 配置时区,在mysql时间对象转flink对象时需要
);
create table target( -- 创建字段
`ID` int primary key,
`name` varchar(32),
`CREATE_TIME` TIMESTAMP(3),
`UPDATE_TIME` TIMESTAMP(3)
)
with(
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.url' = 'jdbc:mysql://${hostname}:3306/ms_data', -- jdbc url
'connector.table' = 'test', -- 表名
'connector.username' = '${username}', -- 用户名
'connector.password' = '${password}', -- 密码
'connector.write.flush.max-rows' = '1' -- 缓存到指定条数,再一起执行,默认 5000 条,改为 1 条
);
-- 把源表数据,插入目标表
insert into merchant_store_cost_info_target select * from merchant_store_cost_info_source;
需要的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.4.2</version>
</dependency>
Flink PostgreSQL CDC 配置
修改PG配置
# 更改wal日志方式为logical(必须)
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(非必须)(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(非必须)(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(非必须)(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
创建用户并授予权限
按实际修改下列参数,postgres是数据库名,public是分区名,test_cdc_user是用户名
CREATE USER test_cdc_user WITH PASSWORD '999'; # 创建用户密码
ALTER ROLE test_cdc_user replication; # 用户必须有逻辑复制权限
GRANT CONNECT ON DATABASE postgres to test_cdc_user; # 远程连接权限
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test_cdc_user; # 获取数据权限
GRANT CREATE ON database postgres to test_cdc_user # 创建权限
发布表
发布所有表(也可以改为按需发布单个表),发布了之后,从库才可以订阅
UPDATE pg_publication SET puballtables=true WHERE pubname is not null;
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
Flink Table API
SET 'execution.checkpointing.interval' = '5s'; -- 设置flink checkpoint间隔时间
SET 'execution.checkpointing.min-pause' = '2S'; -- 设置两个flink checkpoint之间最小间隔时间,防止checkpoint太频繁产生的错误
CREATE TABLE table_source (
id varchar primary key NOT ENFORCED,
name varchar,
doc varchar,
play int
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '${huku-pg-hostname}',
'port' = '5432',
'username' = '${huku-pg-username}',
'password' = '${huku-pg-password}',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'table',
'slot.name' = 'table_slot',
'debezium.slot.drop.on.stop' = 'true',
'decoding.plugin.name' = 'pgoutput'
);
create table table_target(
id varchar primary key NOT ENFORCED,
name varchar,
doc varchar,
play int
)
with(
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.url' = 'jdbc:mysql://${mysql-hostname}:3306/ms_data', -- jdbc url
'connector.table' = 'ods_hk_r_dim_organ_nst', -- 表名
'connector.username' = '${mysql-username}', -- 用户名
'connector.password' = '${mysqk-password}', -- 密码
'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条
);
insert into table_target select *,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS cdc_sync_time from table_source; -- 插入数据库,并记录插入时间
常见问题:
同步update和delete日志时报错
需要修改表的REPLICA IDENTITY 属性,该属性默认是主键,当没有主键的时候,需要指定唯一索引,没有唯一索引时,选full,full的性能最差。
ALTER TABLE public.table REPLICA IDENTITY USING INDEX pk_index;
ALTER TABLE public.table REPLICA IDENTITY full;
WAL日志无限增长
wal_keep_size(默认是100G)设置保留的wal日志大小,但真正保留的是wal_keep_size + 一个wal日志,所以真实保留的日志永远会大于wal_keep_size的大小。
并且当存在Replication Slots(复制槽)的时候,复制槽会保留未同步的wal日志,只会导致wal日志总大小突破wal_keep_size的限制。尤其是当创建了复制槽,但因为远程数据库没用的话,就会导致wal日志一直保留无限增大。挂起的复制槽,需要手动删除。
用max_slot_wal_keep_size就能限制复制槽保留的wal日志大小,设置要比wal_keep_size大。
关于wal_keep_size的官方文档:
PostgreSQL: Documentation: 15: 30.5. WAL Configuration
关于max_slot_wal_keep_size的官方文档:
PostgreSQL: Documentation: 15: 27.2. Log-Shipping Standby Servers