CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi

《大数据平台架构与原型实现:数据中台建设实战》 博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。


本文是 《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》的增强版,在打通从源端数据库到 Hudi 表的完整链路的前提下,还额外做了如下两项工作:

  • 引入 Confluent Schema Registry,有效控制和管理上下游的 Schema 变更

  • 使用 Avro 格式替换 Json,搭配 Schema Registry,可以抽离 Avro 中的 Schema 数据,减少了 Avro 消息的体积,提升传输速率

1. 环境准备


  • 本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库;

  • 本文需要搭建一个 Confluent Schema Registry,如果仅以测试为目的,建议使用 Confluent 提供的 官方Docker镜像,构建操作可参考其 [官方文档];

  • 我们需要安装多个 Flink Connector 和 Format 组件,包括:Flink CDC MySQL Connector、Flink Hudi Connector、Flink debezium-avro-confluent Format Support,关于这些组件的安装,已经全部记录在了《Flink SQL Client 安装各类 Connector、Format 组件的方法汇总》一文中,请移步此文选择需要的组件酌情安装;

  • 安装好各种依赖组件后,执行如下脚本,清空 Hudi 表目标位置上的文件,停止正在运行中的 Yarn App,并启动一个新的 Flink Yarn Session:

    echo "clean hudi table target location..."
    aws s3 rm --recursive s3://glc-flink-hudi-test/sink_hudi_orders
    echo "Kill all running apps..."
    for appId in $(yarn application -list -appStates RUNNING 2>1 | awk 'NR > 2 { print $1 }'); do
        yarn application -kill $appId &> /dev/null
    done
    echo "start a flink yarn session..."
    flink-yarn-session -d
    
  • 清理 Kafka 中的 Topic (二次测试时会上次写入的消息会影响测试结果)

    # run on a host installed kafka console client
    export KAFKA_BOOTSTRAP_SERVERS='b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --delete --topic 'orders.*'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    
  • 启动 Flink SQL Client

    /usr/lib/flink/bin/sql-client.sh embedded shell
    

2. 创建 Flink CDC 源表


本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库。在一个既有的 Flink 环境上提前安装 Kafka 和 Flink CDC Connector 以及 Debezium Json Format,具体安装方法参考:Flink SQL Client 安装各类 Connector、组件的方法汇总(持续更新中…)。环境准备好后,打开 Flink SQL Client,执行如下建表语句:

SET 'sql-client.execution.result-mode' = 'TABLEAU';

DROP TABLE IF EXISTS orders_mysql_cdc;

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.0.13.30',
    'port' = '3307',
    'username' = 'root',
    'password' = 'Admin1234!',
    'database-name' = 'inventory',
    'table-name' = 'orders'
);

-- SELECT * FROM orders_mysql_cdc;

3. 创建 Kafka 中间表 ( debezium-avro-confluent 格式)


Kafka 中间表使用 kafka connector + debezium-avro-confluent 格式:

DROP TABLE IF EXISTS orders_kafka_avro;

CREATE TABLE IF NOT EXISTS orders_kafka_avro (
    order_number int,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders_kafka_avro',
    'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
    'properties.group.id' = 'orders_kafka_avro',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-avro-confluent',
    'debezium-avro-confluent.schema-registry.url' = 'http://10.0.13.30:8085'
);

insert into orders_kafka_avro select * from orders_mysql_cdc;

-- select * from orders_kafka_avro;

4. 创建 Hudi 目标表


Kafka 中流式注入 debezium-avro-confluent 格式的 CDC 消息后,就可以写入 Hudi 表了,Hudi 表使用 Hudi Connector 创建,必须使用流式读取(‘read.streaming.enabled’=‘true’),且必须开启 changelog 模式(‘changelog.enabled’ = ‘true’):

SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2s';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.incremental' = 'true';
SET 'state.checkpoints.num-retained' = '10';
    
-- register hudi hms catalog
CREATE CATALOG hudi_hms_catalog WITH (
    'type' = 'hudi',
    'mode' = 'hms',
    'default-database' = 'default',
    'hive.conf.dir' = '/etc/hive/conf',
    'table.external' = 'true'
);

CREATE DATABASE IF NOT EXISTS hudi_hms_catalog.inventory;

DROP TABLE IF EXISTS hudi_hms_catalog.inventory.sink_hudi_orders;

-- create hudi table, metadata is read/write via HoodieHiveCatalog, 
-- flink/hive/spark can all read or write this hudi talbe. 
CREATE TABLE IF NOT EXISTS hudi_hms_catalog.inventory.sink_hudi_orders (
    order_number int PRIMARY KEY NOT ENFORCED,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int,
    event_time   TIMESTAMP(3)
) WITH (
    'connector' = 'hudi',
    'path' = 's3://glc-flink-hudi-test/sink_hudi_orders',
    'table.type' = 'MERGE_ON_READ',
    'precombine.field' = 'event_time',
    'changelog.enabled' = 'true',
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '2',
    'read.streaming.skip_compaction' = 'true',
    'read.streaming.start-commit' = 'earliest'
);

insert into hudi_hms_catalog.inventory.sink_hudi_orders select *,CURRENT_TIMESTAMP from orders_kafka_avro;

select * from hudi_hms_catalog.inventory.sink_hudi_orders;

5. 完整演示


2024-02-16_11-18-08


关联阅读

《CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi》

《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》

《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-21 13:34:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-21 13:34:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-02-21 13:34:01       82 阅读
  4. Python语言-面向对象

    2024-02-21 13:34:01       91 阅读

热门阅读

  1. 代码随想录算法训练营总结

    2024-02-21 13:34:01       45 阅读
  2. CSS中伪元素和伪类的区别和作用?

    2024-02-21 13:34:01       50 阅读
  3. GO框架基础 (三)、xorm库

    2024-02-21 13:34:01       56 阅读
  4. LeetCode刷题笔记之二叉树(二)

    2024-02-21 13:34:01       43 阅读
  5. 【Lazy ORM 高级映射】1.2.2-JDK17-SNAPSHOT

    2024-02-21 13:34:01       50 阅读
  6. OpenCart程序结构与业务逻辑

    2024-02-21 13:34:01       56 阅读
  7. 【Python】OpenCV-图像滤波

    2024-02-21 13:34:01       47 阅读
  8. *EtherCAT:网络小能手,工业界的速度之星!**

    2024-02-21 13:34:01       48 阅读
  9. list.stream().forEach()和list.forEach()的区别

    2024-02-21 13:34:01       51 阅读
  10. C++ 基础算法 高精度乘法

    2024-02-21 13:34:01       42 阅读
  11. gem5标准库概述

    2024-02-21 13:34:01       46 阅读
  12. SQLite 知识整理

    2024-02-21 13:34:01       48 阅读
  13. uniapp使用sqlite

    2024-02-21 13:34:01       50 阅读