详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换

一、表转换为 DataStream

/**
	Table API 中表到 DataStream 有两种模式:
	1.追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景。
	2.撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据,Delete)
*/
public class TableTransformDataStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        //注册表
        tableEnv.connect(new FileSystem().path("./sensor.txt"))
            .withFormat(new Csv())
            .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temperature", DataTypes.DOUBLE()))
            .createTemporaryTable("sensor");
        
        //获取表
        Table sensorTable = tableEnv.from("sensor");
        
        //简单查询
        Table resultTable = sensorTable.select("id, temperature").where("id = 'sensor_1'");
        
        //聚合查询统计
        Table aggTable = sensorTable.groupBy("id").select("id, id.count as cnt, temperature.avg as avgTemp");
        
        DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

        DataStream<Tuple2<Boolean, Row>> aggStream = tableEnv.toRetractStream(aggTable, Row.class);

        resultStream.print("result");
        aggStream.print("agg");
        
        env.execute();
    }
}

二、DataStream 转换为表

public class DataStreamTransformTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //使用 fromDataStream 将 DataStream 转换为表
        Table table = tableEnv.fromDataStream(dataStream);
        //Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");
        
        env.execute();
    }
}

三、查看执行计划

/**
    Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或 TableEnvironment.explain() 方法完成的。
    explain 方法会返回一个字符串,描述三个计划:
        1.未优化的逻辑查询计划
        2.优化后的逻辑查询计划
        3.实际执行计划
*/
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

相关推荐

  1. FlinkDataStream API转换算子

    2024-06-11 20:24:05       56 阅读
  2. FlinkSQL联结函数

    2024-06-11 20:24:05       63 阅读
  3. 69、Flink DataStream Connector Kafka 连接器详解

    2024-06-11 20:24:05       21 阅读
  4. 70、Flink DataStream Connector JDBC 连接器详解

    2024-06-11 20:24:05       24 阅读
  5. Flink SqlFlink DataStream区别及使用场景

    2024-06-11 20:24:05       21 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-11 20:24:05       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-11 20:24:05       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-11 20:24:05       87 阅读
  4. Python语言-面向对象

    2024-06-11 20:24:05       96 阅读

热门阅读

  1. 02. fastLed 基本用法

    2024-06-11 20:24:05       22 阅读
  2. angular2网页前端执行流程

    2024-06-11 20:24:05       31 阅读
  3. 制作手机IOS苹果ipa应用的重签名工具

    2024-06-11 20:24:05       30 阅读
  4. golang生成根证书,服务端证书,用于 tls

    2024-06-11 20:24:05       31 阅读
  5. WEB前端三大主流框架

    2024-06-11 20:24:05       33 阅读
  6. Docker面试整理-如何进行Docker镜像的构建和发布?

    2024-06-11 20:24:05       32 阅读
  7. es6基础语法

    2024-06-11 20:24:05       22 阅读
  8. React框架基础教程

    2024-06-11 20:24:05       28 阅读