1、Flink CDC
1.1、什么是 Flink CDC?
CDC(Change Data Capture)也就是变更数据采集的意思,我们之前学的 Maxwell 就是一种 CDC 工具。今天要学的 Flink CDC 则是 Flink 自己开发的一款与 Flink 自身无缝集成的 CDC 工具,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。而Flink 正是集成了 Debezium 作为捕获数据变更的引擎,所以它可以充分发挥 Debezium 的能力。Flink 和 Debezium 最大的不同之处在于,Flink 并不强制依赖于 Kafka。
1.2、为什么要使用 Flink CDC?
之前我们监听 MySQL 的业务数据变更是通过 Maxwell 或者可以通过 Cannal ,它们都是通过监听 binlog 日志来实现数据同步的。
在实时数仓中,如果我们要使用上面的 Maxwell 或 Cannal 做业务数据同步,那必须通过下面这几步:
- mysql 开启 binlog
- Maxwell/Canal 同步 binlog 数据写入到 Kafka
- Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理
整体来看步骤很长,而且用到的组件也比较多,那能不能直接 Flink 一步到位,直接自己通过 binlog 来实现日志同步呢?所以 Flink CDC 这就出现了。
Flink CDC 数据格式
{
"before": {
"id": "PF1784570096901248",
"pay_order_no": null,
"out_no": "J1784570080435328",
"title": "充值办卡",
"from_user_id": "PG11111",
"from_account_id": "1286009802396288",
"user_id": "BO1707796995184000",
"account_id": "1707895210106496",
"amount": 13400,
"profit_state": 1,
"profit_time": 1686758315000,
"refund_state": 0,
"refund_time": null,
"add_time": 1686758315000,
"remark": "充值办卡",
"acct_circle": "PG11111",
"user_type": 92,
"from_user_type": 90,
"company_id": "PG11111",
"profit_mode": 1,
"type": 2,
"parent_id": null,
"oc_profit_id": "1784570096901248",
"keep_account_from_user_id": null,
"keep_account_from_bm_user_id": null,
"keep_account_user_id": null,
"keep_account_bm_user_id": null,
"biz_company_id": "PG11111"
},
"after": {
"id": "PF1784570096901248",
"pay_order_no": null,
"out_no": "J1784570080435328",
"title": "充值办卡",
"from_user_id": "PG11111",
"from_account_id": "1286009802396288",
"user_id": "BO1707796995184000",
"account_id": "1707895210106496",
"amount": 13400,
"profit_state": 1,
"profit_time": 1686758315000,
"refund_state": 0,
"refund_time": null,
"add_time": 1686758315000,
"remark": "充值办卡1",
"acct_circle": "PG11111",
"user_type": 92,
"from_user_type": 90,
"company_id": "PG11111",
"profit_mode": 1,
"type": 2,
"parent_id": null,
"oc_profit_id": "1784570096901248",
"keep_account_from_user_id": null,
"keep_account_from_bm_user_id": null,
"keep_account_user_id": null,
"keep_account_bm_user_id": null,
"biz_company_id": "PG11111"
},
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1686734882000,
"snapshot": "false",
"db": "cloud_test",
"sequence": null,
"table": "acct_profit",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000514",
"pos": 650576218,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1686734882689,
"transaction": null
}
1.3、使用 Flink CDC
MySQL 首先得开启 binlog 功能,这里需要特别注意 MySQL 时区的设置(我们需要保证 Flink 的时间和数据库时间保持一致):
show variables like '%time_zone%';
# 如果失去是UTC,则需要时间校正
set time_zone='+8:00';
1.3.1、变更数据处理逻辑
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomSink extends RichSinkFunction<String> {
// 自定义向下游写入
@Override
public void invoke(String json, Context context) throws Exception {
// 每次产生变更操作后对数据进行什么操作,比如写入到ES、Redis、MongoDB等
System.out.println(">>>" + json);
}
// 写入其它数据源需要创建连接
@Override
public void open(Configuration parameters) throws Exception {
}
// 关闭外部数据库连接
@Override
public void close() throws Exception {
}
}
1.3.2、构建 Flink CDC 监听程序
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> source = MySqlSource.builder()
.hostname("hadoop102")
.port(3306)
.databaseList("数据库名")
.tableList("表名") // 多个表之间用逗号切割
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化器
.includeSchemaChanges(true) // 是否监听表结构的变化
.build();
// 启动本地 WEB-UI
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT,8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 检查点时间间隔
env.enableCheckpointing(5000);
DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
.addSink(new CustomSink());
env.execute();
}
}