Flink SQL CDC 配置文档

说明

本文档适用于使用ApacheStream + Flink + Flink CDC Connectors来实现实时流式计算和同步

Flink Mysql CDC 配置

-- 配置flink checkpoing间隔时间,显式启动checkpoing,不加此配置不能实现同时读取全量数据和增量(binlog)数据。
SET 'execution.checkpointing.interval' = '3s';  
drop table if exists source;  -- 删除flink表(不是真实的表)
create table source(  -- 创建字段
  `ID` int primary key,
  `name` varchar(32),
  `CREATE_TIME` TIMESTAMP(3),
  `UPDATE_TIME` TIMESTAMP(3)
  )
with(  -- 配置连接参数
  'connector' = 'mysql-cdc',
  'hostname' = '${hostname}',
  'port' = '3306',
  'username' = '${username}',
  'password' = '${password}',
  'database-name' = 'test_db',
  'table-name' = 'test',
  'server-time-zone' = 'Asia/Shanghai'  -- 配置时区,在mysql时间对象转flink对象时需要
);
 
create table target(  -- 创建字段
  `ID` int primary key,
  `name` varchar(32),
  `CREATE_TIME` TIMESTAMP(3),
  `UPDATE_TIME` TIMESTAMP(3)
  )
with(
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.url' = 'jdbc:mysql://${hostname}:3306/ms_data', -- jdbc url
'connector.table' = 'test', -- 表名
'connector.username' = '${username}', -- 用户名
'connector.password' = '${password}', -- 密码
'connector.write.flush.max-rows' = '1' -- 缓存到指定条数,再一起执行,默认 5000 条,改为 1 条
);
-- 把源表数据,插入目标表
insert into merchant_store_cost_info_target select * from merchant_store_cost_info_source; 
需要的依赖包
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.14.5</version>
  </dependency>
  <dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.33</version>
  </dependency>
  <dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.4.2</version>
  </dependency>

Flink PostgreSQL CDC 配置

修改PG配置

# 更改wal日志方式为logical(必须)
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(非必须)(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(非必须)(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(非必须)(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

创建用户并授予权限

按实际修改下列参数,postgres是数据库名,public是分区名,test_cdc_user是用户名

CREATE USER test_cdc_user WITH PASSWORD '999';  # 创建用户密码
ALTER ROLE test_cdc_user replication;  # 用户必须有逻辑复制权限
GRANT CONNECT ON DATABASE postgres to test_cdc_user;  # 远程连接权限
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test_cdc_user;  # 获取数据权限
GRANT CREATE ON database postgres to test_cdc_user  # 创建权限

发布表

发布所有表(也可以改为按需发布单个表),发布了之后,从库才可以订阅

UPDATE pg_publication SET puballtables=true WHERE pubname is not null;
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

Flink Table API

SET 'execution.checkpointing.interval' = '5s';  -- 设置flink checkpoint间隔时间
SET 'execution.checkpointing.min-pause' = '2S';  -- 设置两个flink checkpoint之间最小间隔时间,防止checkpoint太频繁产生的错误

CREATE TABLE table_source (
  id varchar primary key NOT ENFORCED,
  name varchar,
  doc varchar,
  play int
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '${huku-pg-hostname}',
  'port' = '5432',
  'username' = '${huku-pg-username}',
  'password' = '${huku-pg-password}',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'table',
  'slot.name' = 'table_slot',
  'debezium.slot.drop.on.stop' = 'true',
  'decoding.plugin.name' = 'pgoutput'
);

create table table_target(
  id varchar primary key NOT ENFORCED,
  name varchar,
  doc varchar,
  play int
)
with(
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.url' = 'jdbc:mysql://${mysql-hostname}:3306/ms_data', -- jdbc url
'connector.table' = 'ods_hk_r_dim_organ_nst', -- 表名
'connector.username' = '${mysql-username}', -- 用户名
'connector.password' = '${mysqk-password}', -- 密码
'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条
);

insert into table_target select *,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS cdc_sync_time from table_source;  -- 插入数据库,并记录插入时间

常见问题

同步update和delete日志时报错

需要修改表的REPLICA IDENTITY 属性,该属性默认是主键,当没有主键的时候,需要指定唯一索引,没有唯一索引时,选full,full的性能最差。

ALTER TABLE public.table REPLICA IDENTITY USING INDEX pk_index;
ALTER TABLE public.table REPLICA IDENTITY full;
WAL日志无限增长

wal_keep_size(默认是100G)设置保留的wal日志大小,但真正保留的是wal_keep_size + 一个wal日志,所以真实保留的日志永远会大于wal_keep_size的大小。

并且当存在Replication Slots(复制槽)的时候,复制槽会保留未同步的wal日志,只会导致wal日志总大小突破wal_keep_size的限制。尤其是当创建了复制槽,但因为远程数据库没用的话,就会导致wal日志一直保留无限增大。挂起的复制槽,需要手动删除。

用max_slot_wal_keep_size就能限制复制槽保留的wal日志大小,设置要比wal_keep_size大。

关于wal_keep_size的官方文档:

PostgreSQL: Documentation: 15: 30.5. WAL Configuration

关于max_slot_wal_keep_size的官方文档:

PostgreSQL: Documentation: 15: 27.2. Log-Shipping Standby Servers

更多常见问题:

FAQ(ZH) · apache/flink-cdc Wiki · GitHub

相关推荐

  1. Flink SQL CDC 配置文档

    2024-03-13 06:26:07       46 阅读
  2. Cartographers Lua配置参考文档

    2024-03-13 06:26:07       24 阅读
  3. RT-1配置文件

    2024-03-13 06:26:07       48 阅读
  4. Nginx conf文件配置

    2024-03-13 06:26:07       47 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-13 06:26:07       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-13 06:26:07       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-13 06:26:07       82 阅读
  4. Python语言-面向对象

    2024-03-13 06:26:07       91 阅读

热门阅读

  1. Django中的ajax细节

    2024-03-13 06:26:07       40 阅读
  2. 【Vue】首屏加载优化

    2024-03-13 06:26:07       45 阅读
  3. webpack一些常用的Loader和Plugin

    2024-03-13 06:26:07       47 阅读
  4. Anaconda3安装pandas失败,处理办法

    2024-03-13 06:26:07       42 阅读
  5. linux 在Ubuntu上安装Nginx

    2024-03-13 06:26:07       41 阅读
  6. C++容器——unordered_set浅谈

    2024-03-13 06:26:07       44 阅读
  7. 探索Vue.js:前端开发的新视角

    2024-03-13 06:26:07       42 阅读
  8. 2024华为OD机考面试经验分享

    2024-03-13 06:26:07       56 阅读
  9. C#常见的.Net类型(二)

    2024-03-13 06:26:07       45 阅读