1.maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_cdc_more</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink_demo02</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
<flink.version>1.15.4</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mysql-cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
2.log4j配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM
"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
<log4j:configuration debug="false">
<appender name="myConsole" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %p %m%n"/>
</layout>
<!--过滤器设置输出的级别-->
<filter class="org.apache.log4j.varia.LevelRangeFilter">
<param name="levelMin" value="info"/>
<param name="levelMax" value="error"/>
<param name="AcceptOnMatch" value="true"/>
</filter>
</appender>
<appender name="myFile" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="logs/flink.log"/><!-- 设置日志输出文件名 -->
<!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 -->
<param name="Append" value="true"/>
<param name="MaxBackupIndex" value="5"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %p (%c:%L) %m%n"/>
</layout>
</appender>
<!-- 根logger的设置-->
<root>
<priority value="debug"/>
<appender-ref ref="myConsole"/>
<appender-ref ref="myFile"/>
</root>
</log4j:configuration>
3.使用
3.1打印mysql指定表数据
private static final Logger log = LoggerFactory.getLogger(FlinkSyncMysql.class);
public static void main(String[] args) throws Exception {
printTable();
}
public static void printTable() {
try {
// source端连接信息
String userName = "xxxxxx";
String srcHost = "192.168.xx.xx";
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost).port(3306).username(userName).password("xxxxxx")
.databaseList("apshop")//设置捕获的数据库
.tableList("apshop.weshop_user")//如果需要同步整个数据库,请将 tableList 设置为 ".*"
.deserializer(new JsonDebeziumDeserializationSchema())//将SourceRecord 转换为 JSON 字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),
"MySQL Source").setParallelism(4);// 设置 source 节点的并行度为 4
mySQLSource.print().setParallelism(1);//设置 sink 节点并行度为1
env.execute("Print MySQL");
} catch (Exception e) {
e.printStackTrace();
}
}
3.2mysql表同步
private static void mysql2Mysql() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id Int,\n" +
" name STRING,\n" +
" primary key (id) not enforced\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.xx.xx',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'xxxxxx',\n" +
" 'database-name' = 'demo',\n" +
" 'table-name' = '[0-9]?person[0-9]?'\n" +
")";
String sinkDDL =
"CREATE TABLE test_cdc (" +
" id Int," +
" name STRING," +
" primary key (id) not enforced" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'driver' = 'com.mysql.cj.jdbc.Driver'," +
" 'url' = 'jdbc:mysql://192.168.xx.xx:3306/demo?serverTimezone=UTC&useSSL=false'," +
" 'username' = 'root'," +
" 'password' = 'xxxxxx'," +
" 'table-name' = 'person_sum'" +
")";
// 简单的聚合处理
String transformDmlSQL = "insert into test_cdc select * from mysql_binlog";
System.out.println(sourceDDL);
System.out.println(sinkDDL);
System.out.println(transformDmlSQL);
TableResult tableResult = tableEnv.executeSql(sourceDDL);
TableResult sinkResult = tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformDmlSQL);
// 等待flink-cdc完成快照
result.print();
env.execute("Mysql2MysqlRemote");
}