Flink CDC

1、Flink CDC

1.1、什么是 Flink CDC?

        CDC(Change Data Capture)也就是变更数据采集的意思,我们之前学的 Maxwell 就是一种 CDC 工具。今天要学的 Flink CDC 则是 Flink 自己开发的一款与 Flink 自身无缝集成的 CDC 工具,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。

        目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。而Flink 正是集成了 Debezium 作为捕获数据变更的引擎,所以它可以充分发挥 Debezium 的能力。Flink 和 Debezium 最大的不同之处在于,Flink 并不强制依赖于 Kafka。

1.2、为什么要使用 Flink CDC?

        之前我们监听 MySQL 的业务数据变更是通过 Maxwell 或者可以通过 Cannal ,它们都是通过监听 binlog 日志来实现数据同步的。

        在实时数仓中,如果我们要使用上面的 Maxwell 或 Cannal 做业务数据同步,那必须通过下面这几步:

  • mysql 开启 binlog
  • Maxwell/Canal 同步 binlog 数据写入到 Kafka
  • Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理

整体来看步骤很长,而且用到的组件也比较多,那能不能直接 Flink 一步到位,直接自己通过 binlog 来实现日志同步呢?所以 Flink CDC 这就出现了。

Flink CDC 数据格式

{
    "before": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "after": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡1",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1686734882000,
        "snapshot": "false",
        "db": "cloud_test",
        "sequence": null,
        "table": "acct_profit",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000514",
        "pos": 650576218,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "u",
    "ts_ms": 1686734882689,
    "transaction": null
}

1.3、使用 Flink CDC 

MySQL 首先得开启 binlog 功能,这里需要特别注意 MySQL 时区的设置(我们需要保证 Flink 的时间和数据库时间保持一致):

show variables like '%time_zone%';
# 如果失去是UTC,则需要时间校正
set time_zone='+8:00';

1.3.1、变更数据处理逻辑

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomSink extends RichSinkFunction<String> {

    // 自定义向下游写入
    @Override
    public void invoke(String json, Context context) throws Exception {
        // 每次产生变更操作后对数据进行什么操作,比如写入到ES、Redis、MongoDB等
        System.out.println(">>>" + json);
    }

    // 写入其它数据源需要创建连接
    @Override
    public void open(Configuration parameters) throws Exception {

    }
    // 关闭外部数据库连接
    @Override
    public void close() throws Exception {

    }
}

1.3.2、构建 Flink CDC 监听程序

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MySqlSourceExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> source = MySqlSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("数据库名")
                .tableList("表名")    // 多个表之间用逗号切割
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema())  // 反序列化器
                .includeSchemaChanges(true) // 是否监听表结构的变化
                .build();

        // 启动本地 WEB-UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT,8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // 检查点时间间隔
        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new CustomSink());

        env.execute();
    }
}

相关推荐

  1. <span style='color:red;'>flinkCDC</span>

    flinkCDC

    2024-04-08 07:12:03      12 阅读
  2. 垃圾的flinkcdc

    2024-04-08 07:12:03       10 阅读
  3. FlinkCDC快速搭建实现数据监控

    2024-04-08 07:12:03       22 阅读
  4. FlinkCDC基础篇章1-安装使用

    2024-04-08 07:12:03       15 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-08 07:12:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-08 07:12:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-08 07:12:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-08 07:12:03       18 阅读

热门阅读

  1. 【LintCode】448 · 二叉查找树的中序后继

    2024-04-08 07:12:03       15 阅读
  2. 代码随想录 day24 回溯算法

    2024-04-08 07:12:03       10 阅读
  3. MXNet安装

    2024-04-08 07:12:03       15 阅读
  4. RIP协议

    2024-04-08 07:12:03       13 阅读
  5. redis和ElasticSearch和MongoDB应用场景,如何选择

    2024-04-08 07:12:03       11 阅读
  6. Docker 入门

    2024-04-08 07:12:03       11 阅读
  7. Git(8)之分支间同步特定提交

    2024-04-08 07:12:03       10 阅读
  8. 【Linux】在 Linux 上模拟网络故障

    2024-04-08 07:12:03       11 阅读