flink-cdc-学习笔记(一)

1.flink cdc简介

Flink 1.11 引入了 CDC.
Flink CDC 是一款基于 Flink 打造一系列数据库的连接器。Flink 是流处理的引擎,其主要消费的数据源是类似于一些点击的日志流、曝光流等数据,但在业务场景中,点击流的日志数据只是一部分,具有更大价值的数据隐藏在用户的业务数据库中。Flink CDC 弥补了 Flink 读取这些数据的缺陷,能够通过流式的方式读取数据库中的增量变更的日志。

1.1应用数据场景 CDC

1.日志文件数据(appendOn)
2.数据库数据(CRUD)

1.2同类型产品的对比
基于查询的CDC 基于日志的CDC
开源产品 sqoop,kafka jdbc,datax canal,flink-cdc
执行模式 batch streaming
捕获所有数据的变化 ×
低延迟,不增加数据库的负担 ×
不入侵业务代码(lastUpdate) ×
捕获删除事件 ×
更新记录状态 ×

经过以上对比,我们可以发现基于日志 CDC 有以下这几种优势:

能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失
每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势
无需入侵业务,业务解耦,无需更改业务模型
捕获删除事件和捕获旧记录的状态,在查询 CDC 中,周期的查询无法感知中间数据是否删除

ETL 常用解决方案 采集/计算/传输

传统方案:mysql->kafka-connect->kafka —>app/flink/storm—>ES/Pgsql
flink方案: mysql---->flink---->ES/Pgsql

flink-cdc-connectors 可以用来替换 Debezium+Kafka 的数据采集模块,从而实现 Flink SQL 采集+计算+传输(ETL)一体化,这样做的优点有以下:
开箱即用,简单易上手
减少维护的组件,简化实时链路,减轻部署成本
减小端到端延迟
Flink 自身支持 Exactly Once 的读取和计算
数据不落地,减少存储成本
支持全量和增量流式读取
binlog 采集位点可回溯*

2.flink-cdc-sql-client如何使用

启动服务
./bin/start-cluster.sh
关闭服务
./bin/stop-cluster.sh

启动客户端
./bin/sql-client.sh embedded
默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置。有关环境配置文件结构的更多信息,请参见配置部分。
在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索
CLI 为维护和可视化结果提供三种模式。
表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET execution.result-mode=table;
变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET execution.result-mode=changelog;
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET execution.result-mode=tableau;

环境配置文件

SQL 查询执行前需要配置相关环境变量。环境配置文件 定义了 catalog、table sources、table sinks、用户自定义函数和其他执行或部署所需属性。
详情见:<flinck-cdc-yaml-demo.md>

重启策略(Restart Strategies)

重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的全局重启策略相似,更细精度的重启配置可以在环境配置文件中声明。

依赖 (略)

自定义函数(略)

Catalogs(略)

分离的 SQL 查询(略)

SQL 视图(略)

临时表(Temporal Table)(略)

3.flink-cdc-connect 案例

1.Flink JDBC Connector:Flink 与数据库集成最佳实践
2.基于 Flink SQL CDC 的实时数据同步方案

4.使用脚本实现CDC数据同步

1.下载flink-cdc的二进制包flink-cdc-3.0.tar

2.编写一个任务脚本mysql-to-doris.yaml,内容如下.

source:
  type: mysql
  host: localhost
  port: 3306
  username: admin
  password: pass
  tables: db0.commodity, db1.user_table_[0-9]+, [app|web]_order_\.*

sink:
  type: doris
  fenodes: FE_IP:HTTP_PORT
  username: admin
  password: pass

pipeline:
  name: mysql-sync-doris
  parallelism: 4

3.提交任务到flink集群

# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml
Pipeline "mysql-sync-doris" is submitted with Job ID "DEADBEEF".

5.springboot集成flink-cdc-mysql-connector实现CDC

1.[Flink SQL Client]创建一个cdc源头

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

2.项目里pom.xml引入cdc-connector的包

<dependency>
  <groupId>com.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-mysql-cdc</artifactId>

  <version>3.0.0</version>
</dependency>

3.编写实现类调用任务流程

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

通过下面的指令下载flink-cdc源码本地进行编译

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

这样flink-cdc就安装到本地maven库了.

参考资料

flink官网

Apache官方社区

wiki

文档

视频介绍
基于 Flink SQL CDC 的实时数据同步方案

阿里云-开发者社区

案例

基于 Flink SQL CDC 的实时数据同步方案
Flink JDBC Connector:Flink 与数据库集成最佳实践

相关推荐

  1. flink-cdc-学习笔记()

    2024-03-14 04:26:03       19 阅读
  2. <span style='color:red;'>Flink</span> <span style='color:red;'>CDC</span>

    Flink CDC

    2024-03-14 04:26:03      31 阅读
  3. <span style='color:red;'>Flink</span> <span style='color:red;'>CDC</span>

    Flink CDC

    2024-03-14 04:26:03      16 阅读
  4. flink-cdc使用小结

    2024-03-14 04:26:03       27 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-14 04:26:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-14 04:26:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-14 04:26:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-14 04:26:03       20 阅读

热门阅读

  1. 面试经典-1-合并两个有序数组

    2024-03-14 04:26:03       18 阅读
  2. 有来团队后台项目-解析3

    2024-03-14 04:26:03       19 阅读
  3. 1688中国站获得工厂档案信息 API

    2024-03-14 04:26:03       23 阅读
  4. 算法训练营day44(补),动态规划12

    2024-03-14 04:26:03       18 阅读
  5. 顺序表的构建(C++)---- 代码 + 注释

    2024-03-14 04:26:03       22 阅读
  6. dao5的Sia Khazamipour确认出席Hack.Summit() 2024

    2024-03-14 04:26:03       20 阅读
  7. SkiROS2:技能型机器人控制平台的探索之旅

    2024-03-14 04:26:03       24 阅读
  8. inversions

    2024-03-14 04:26:03       23 阅读
  9. Lua 如何在Lua中调用C/C++函数

    2024-03-14 04:26:03       22 阅读
  10. Unity3D 动态生成场景管理节点详解

    2024-03-14 04:26:03       21 阅读