flink: 使用table API 从kafka的一个分区读取数据并写入另一个分区

package cn.edu.tju.demo2;

import cn.edu.tju.demo.Test29;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;

import java.util.Map;

public class Test39 {
    private static String KAFKA_HOST = "xx.xx.xx.xx:9092";
    private static String ZOOKEEPER_HOST = "xx.xx.xx.xx:2181";

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        tableEnv.connect(new Kafka().version("universal").topic("testTopic")
                        .property("zookeeper.connect", ZOOKEEPER_HOST)
                        .property("bootstrap.servers", KAFKA_HOST))
                .withFormat(new Csv())

                .withSchema(new Schema().field("ts", DataTypes.BIGINT()
                ).field("info", DataTypes.STRING()).field("val", DataTypes.DOUBLE()))
                .createTemporaryTable("data_info");

        tableEnv.connect(new Kafka().version("universal").topic("result")
                        .property("zookeeper.connect", ZOOKEEPER_HOST)
                        .property("bootstrap.servers", KAFKA_HOST))
                .withFormat(new Csv())

                .withSchema(new Schema()
                .field("info", DataTypes.STRING()).field("maxVal", DataTypes.DOUBLE()))
                .createTemporaryTable("result_info");

        //String sql = "select info,max(val) as maxVal from data_info group by info";

        //Table selectTable = tableEnv.sqlQuery(sql);
        Table selectTable = tableEnv.from("data_info").select("info,val");
        selectTable.insertInto("result_info");

        tableEnv.execute("my job");


    }


}

kafka为2.6
完整pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-proj</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>

<!--        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>-->

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.22.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>2.0.12</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.30</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>





    </dependencies>

</project>

向testTopic输入数据:

1689999833,demo3,66.77

可以在result topic看到数据:

demo3,66.77

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-12 06:46:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-12 06:46:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-12 06:46:04       82 阅读
  4. Python语言-面向对象

    2024-03-12 06:46:04       91 阅读

热门阅读

  1. Flink创建TableEnvironment

    2024-03-12 06:46:04       36 阅读
  2. 【IVA】什么是IVA?

    2024-03-12 06:46:04       49 阅读
  3. 块级作用域、变量提升

    2024-03-12 06:46:04       39 阅读
  4. 列表循环多个el-form-item并校验

    2024-03-12 06:46:04       41 阅读
  5. PYTHON 120道题目详解(100-102)

    2024-03-12 06:46:04       47 阅读
  6. Golang-如何优雅的关闭一个Channel?

    2024-03-12 06:46:04       39 阅读
  7. Windows版Redis启用密码

    2024-03-12 06:46:04       38 阅读
  8. 正则表达式笔记+demo

    2024-03-12 06:46:04       45 阅读
  9. Leetcode 第388场周赛 问题和解法

    2024-03-12 06:46:04       45 阅读