博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
文章目录
本文是 《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》的增强版,在打通从源端数据库到 Hudi 表的完整链路的前提下,还额外做了如下两项工作:
引入 Confluent Schema Registry,有效控制和管理上下游的 Schema 变更
使用 Avro 格式替换 Json,搭配 Schema Registry,可以抽离 Avro 中的 Schema 数据,减少了 Avro 消息的体积,提升传输速率
1. 环境准备
本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的
inventory
数据库;本文需要搭建一个 Confluent Schema Registry,如果仅以测试为目的,建议使用 Confluent 提供的 官方Docker镜像,构建操作可参考其 [官方文档];
我们需要安装多个 Flink Connector 和 Format 组件,包括:Flink CDC MySQL Connector、Flink Hudi Connector、Flink
debezium-avro-confluent
Format Support,关于这些组件的安装,已经全部记录在了《Flink SQL Client 安装各类 Connector、Format 组件的方法汇总》一文中,请移步此文选择需要的组件酌情安装;安装好各种依赖组件后,执行如下脚本,清空 Hudi 表目标位置上的文件,停止正在运行中的 Yarn App,并启动一个新的 Flink Yarn Session:
echo "clean hudi table target location..." aws s3 rm --recursive s3://glc-flink-hudi-test/sink_hudi_orders echo "Kill all running apps..." for appId in $(yarn application -list -appStates RUNNING 2>1 | awk 'NR > 2 { print $1 }'); do yarn application -kill $appId &> /dev/null done echo "start a flink yarn session..." flink-yarn-session -d
清理 Kafka 中的 Topic (二次测试时会上次写入的消息会影响测试结果)
# run on a host installed kafka console client export KAFKA_BOOTSTRAP_SERVERS='b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092' kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --delete --topic 'orders.*' kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
启动 Flink SQL Client
/usr/lib/flink/bin/sql-client.sh embedded shell
2. 创建 Flink CDC 源表
本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory
数据库。在一个既有的 Flink 环境上提前安装 Kafka 和 Flink CDC Connector 以及 Debezium Json Format,具体安装方法参考:Flink SQL Client 安装各类 Connector、组件的方法汇总(持续更新中…)。环境准备好后,打开 Flink SQL Client,执行如下建表语句:
SET 'sql-client.execution.result-mode' = 'TABLEAU';
DROP TABLE IF EXISTS orders_mysql_cdc;
CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
`order_number` INT NOT NULL,
`order_date` DATE NOT NULL,
`purchaser` INT NOT NULL,
`quantity` INT NOT NULL,
`product_id` INT NOT NULL,
CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.0.13.30',
'port' = '3307',
'username' = 'root',
'password' = 'Admin1234!',
'database-name' = 'inventory',
'table-name' = 'orders'
);
-- SELECT * FROM orders_mysql_cdc;
3. 创建 Kafka 中间表 ( debezium-avro-confluent 格式)
Kafka 中间表使用 kafka connector + debezium-avro-confluent 格式:
DROP TABLE IF EXISTS orders_kafka_avro;
CREATE TABLE IF NOT EXISTS orders_kafka_avro (
order_number int,
order_date date,
purchaser int,
quantity int,
product_id int
) WITH (
'connector' = 'kafka',
'topic' = 'orders_kafka_avro',
'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
'properties.group.id' = 'orders_kafka_avro',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://10.0.13.30:8085'
);
insert into orders_kafka_avro select * from orders_mysql_cdc;
-- select * from orders_kafka_avro;
4. 创建 Hudi 目标表
Kafka 中流式注入 debezium-avro-confluent
格式的 CDC 消息后,就可以写入 Hudi 表了,Hudi 表使用 Hudi Connector 创建,必须使用流式读取(‘read.streaming.enabled’=‘true’),且必须开启 changelog 模式(‘changelog.enabled’ = ‘true’):
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2s';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.incremental' = 'true';
SET 'state.checkpoints.num-retained' = '10';
-- register hudi hms catalog
CREATE CATALOG hudi_hms_catalog WITH (
'type' = 'hudi',
'mode' = 'hms',
'default-database' = 'default',
'hive.conf.dir' = '/etc/hive/conf',
'table.external' = 'true'
);
CREATE DATABASE IF NOT EXISTS hudi_hms_catalog.inventory;
DROP TABLE IF EXISTS hudi_hms_catalog.inventory.sink_hudi_orders;
-- create hudi table, metadata is read/write via HoodieHiveCatalog,
-- flink/hive/spark can all read or write this hudi talbe.
CREATE TABLE IF NOT EXISTS hudi_hms_catalog.inventory.sink_hudi_orders (
order_number int PRIMARY KEY NOT ENFORCED,
order_date date,
purchaser int,
quantity int,
product_id int,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 's3://glc-flink-hudi-test/sink_hudi_orders',
'table.type' = 'MERGE_ON_READ',
'precombine.field' = 'event_time',
'changelog.enabled' = 'true',
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '2',
'read.streaming.skip_compaction' = 'true',
'read.streaming.start-commit' = 'earliest'
);
insert into hudi_hms_catalog.inventory.sink_hudi_orders select *,CURRENT_TIMESTAMP from orders_kafka_avro;
select * from hudi_hms_catalog.inventory.sink_hudi_orders;
5. 完整演示
关联阅读:
《CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi》
《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》
《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》