flink使用

1.maven

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_cdc_more</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>flink_demo02</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
        <flink.version>1.15.4</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--        mysql-cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!--        mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
            <scope>compile</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

2.log4j配置

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM
        "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
<log4j:configuration debug="false">
    <appender name="myConsole" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %p %m%n"/>
        </layout>
        <!--过滤器设置输出的级别-->
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="info"/>
            <param name="levelMax" value="error"/>
            <param name="AcceptOnMatch" value="true"/>
        </filter>
    </appender>

    <appender name="myFile" class="org.apache.log4j.RollingFileAppender">
        <param name="File" value="logs/flink.log"/><!-- 设置日志输出文件名 -->
        <!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 -->
        <param name="Append" value="true"/>
        <param name="MaxBackupIndex" value="5"/>

        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %p (%c:%L) %m%n"/>
        </layout>
    </appender>

    <!-- 根logger的设置-->
    <root>
        <priority value="debug"/>
        <appender-ref ref="myConsole"/>
        <appender-ref ref="myFile"/>
    </root>
</log4j:configuration>

3.使用

3.1打印mysql指定表数据

    private static final Logger log = LoggerFactory.getLogger(FlinkSyncMysql.class);

    public static void main(String[] args) throws Exception {
        printTable();
    }

    public static void printTable() {
        try {
            // source端连接信息
            String userName = "xxxxxx";
            String srcHost = "192.168.xx.xx";
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname(srcHost).port(3306).username(userName).password("xxxxxx")
                    .databaseList("apshop")//设置捕获的数据库
                    .tableList("apshop.weshop_user")//如果需要同步整个数据库,请将 tableList 设置为 ".*"
                    .deserializer(new JsonDebeziumDeserializationSchema())//将SourceRecord 转换为 JSON 字符串
                    .build();

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),
                            "MySQL Source").setParallelism(4);// 设置 source 节点的并行度为 4
            mySQLSource.print().setParallelism(1);//设置 sink 节点并行度为1
            env.execute("Print MySQL");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3.2mysql表同步

    private static void mysql2Mysql() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " id Int,\n" +
                        " name STRING,\n" +
                        " primary key (id) not enforced\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.xx.xx',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'xxxxxx',\n" +
                        " 'database-name' = 'demo',\n" +
                        " 'table-name' = '[0-9]?person[0-9]?'\n" +
                        ")";
        String sinkDDL =
                "CREATE TABLE test_cdc (" +
                        " id Int," +
                        " name STRING," +
                        " primary key (id) not enforced" +
                        ") WITH (" +
                        " 'connector' = 'jdbc'," +
                        " 'driver' = 'com.mysql.cj.jdbc.Driver'," +
                        " 'url' = 'jdbc:mysql://192.168.xx.xx:3306/demo?serverTimezone=UTC&useSSL=false'," +
                        " 'username' = 'root'," +
                        " 'password' = 'xxxxxx'," +
                        " 'table-name' = 'person_sum'" +
                        ")";

        // 简单的聚合处理
        String transformDmlSQL = "insert into test_cdc select * from mysql_binlog";

        System.out.println(sourceDDL);
        System.out.println(sinkDDL);
        System.out.println(transformDmlSQL);

        TableResult tableResult = tableEnv.executeSql(sourceDDL);
        TableResult sinkResult = tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformDmlSQL);
        // 等待flink-cdc完成快照
        result.print();

        env.execute("Mysql2MysqlRemote");
    }

相关推荐

  1. flink使用

    2024-07-11 23:50:04       23 阅读
  2. flink-cdc使用小结

    2024-07-11 23:50:04       49 阅读
  3. Flink ValueStateDescriptor使用实例

    2024-07-11 23:50:04       27 阅读
  4. flink启动错误(使用YARN)

    2024-07-11 23:50:04       34 阅读
  5. flink 和 clipper搭配使用

    2024-07-11 23:50:04       28 阅读
  6. flink实战--flink的job_listener使用解析

    2024-07-11 23:50:04       47 阅读
  7. Flink Sql和Flink DataStream的区别及使用场景

    2024-07-11 23:50:04       17 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 23:50:04       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 23:50:04       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 23:50:04       58 阅读
  4. Python语言-面向对象

    2024-07-11 23:50:04       69 阅读

热门阅读

  1. Github 2024-07-05开源项目日报 Top10

    2024-07-11 23:50:04       19 阅读
  2. 2024.7.7刷题记录

    2024-07-11 23:50:04       20 阅读
  3. Vue3 + Vite项目使用SVG图片

    2024-07-11 23:50:04       19 阅读
  4. 代码随想录-DAY⑤-哈希表——leetcode 242 | 349 | 202

    2024-07-11 23:50:04       22 阅读
  5. Python爬虫-requests模块

    2024-07-11 23:50:04       25 阅读
  6. AIGC各个应用场景下的模型选择

    2024-07-11 23:50:04       24 阅读