Fink CDC数据同步(四)Mysql数据同步到Kafka

依赖项

将下列依赖包放在flink/lib

flink-sql-connector-kafka-1.16.2

创建映射表

创建MySQL映射表

CREATE TABLE if not exists mysql_user (
     id     int,
     name   STRING,
     birth  STRING,
     gender    STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector'= 'mysql-cdc',
    'hostname'= '192.168.0.1',
    'port'= '3306',
    'username'= 'user',
    'password'='password',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode'='initial',
    'database-name'= 'bigdata',
    'table-name'= 'user'
); 

select * from mysql_user;

创建upsert-kafka 表

CREATE TABLE kafka_user_upsert(
     id     int,
     name   string,
     birth  string,
     gender    string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'flink-cdc-user',
 'properties.bootstrap.servers' = '192.168.0.4:6668',
 'properties.group.id' = 'flink-cdc-kafka-group',
 'key.format' = 'json',
 'value.format' = 'json'
);

这里指定的Kafka topic会自动创建,也可以预先自行创建

生成作业

insert into kafka_user_upsert select * from mysql_user;

select * from kafka_user_upsert;


 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

相关推荐

  1. 使用maxwell实时同步mysql数据kafka

    2024-02-08 15:06:03       16 阅读
  2. Kafka数据同步原理详解

    2024-02-08 15:06:03       37 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-08 15:06:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-08 15:06:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-08 15:06:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-08 15:06:03       18 阅读

热门阅读

  1. 【mybatis自动治愈插件】

    2024-02-08 15:06:03       31 阅读
  2. LeetCode 二叉树/n叉树的解题思路

    2024-02-08 15:06:03       37 阅读
  3. Mockito测试框架中的方法详解

    2024-02-08 15:06:03       32 阅读
  4. C语言中大小写字母转换详解

    2024-02-08 15:06:03       34 阅读
  5. 【六】CocosCreator-CCObject.js源码分析

    2024-02-08 15:06:03       30 阅读
  6. 最全软件系统架构演变!

    2024-02-08 15:06:03       35 阅读
  7. 精通Python中的正则表达式

    2024-02-08 15:06:03       33 阅读
  8. 贪心算法入门题(算法村第十七关青铜挑战)

    2024-02-08 15:06:03       32 阅读