详解 Flink Table API 和 Flink SQL 之入门介绍

一、介绍

  • Flink 提供了对于“表”处理的支持,这就是更高层级的应用 API,在 Flink 中被称为 Table API 和 SQL
  • Table API 是基于“表”(Table)的一套 API,它是内嵌在 Java、Scala 等语言中的一种声明式领域特定语言(DSL),也就是专门为处理表而设计的
  • Flink SQL 是基于 Apache Calcite 实现了对 SQL 的支持
  • Table API 和 SQL 最初并不完善,在 Flink 1.9 版本合并阿里巴巴内部版本 Blink 之后发生了非常大的改变,此后也一直处在快速开发和完善的过程中,直到 Flink 1.12 版本才基本上做到了功能上的完善。而即使是在目前最新的 1.13 版本中, Table API 和 SQL 也依然不算稳定,接口用法还在不停调整和更新

二、快速入门

1. 引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

<!-- 或者引入阿里的 blink 版本依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

2. 案例

public class TestTableAPIAndSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //读取数据并转换成 POJO
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //1. 创建 TableAPI 执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        //2. 基于 DataStream 创建表
        Table dataTable = tableEnv.fromDataStream(dataStream);
        
        //3. 调用 TableAPI
        Table resultTable = dataTable.select("id, temperature").where("id = 'sensor_1'");
        
        //4. 使用 Flink SQL 语法
        //4.1 基于 Table 注册一个查询视图
        tableEnv.createTemporaryView("sensor", dataTable);
        //4.2 对注册的视图进行 SQL 操作
        String sql = "select id, temperature from sensor where id = 'sensor_1'";
        Table resultSqlTable = tableEnv.sqlQuery(sql);
        
        //5.打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("table");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
        
        env.execute();
    }
}

三、程序结构

TableAPI 和 SQL 程序的整体结构与 DataStream API 非常相似,也可以分为读取数据源(Source)、转换(Transform)和输出数据(Sink)三部分,TableAPI 的输入输出操作不需要额外定义,只需要将用于输入和输出的表定义出来,然后进行转换查询就可以

//1.创建表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);  

//2.创建一张表,用于读取数据源
tableEnv.connect(...).createTemporaryTable("inputTable");

//3.通过 TableAPI或SQL进行转换查询
Table result = tableEnv.from("inputTable").select(...);
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");

//4.注册一张表,用于输出结果
tableEnv.connect(...).createTemporaryTable("outputTable");
result.insertInto("outputTable");

相关推荐

  1. 详解 Flink Table API Flink SQL 入门介绍

    2024-06-10 22:10:04       27 阅读
  2. VsCode的介绍入门详细讲解

    2024-06-10 22:10:04       65 阅读
  3. 详解 Flink CDC 的介绍入门案例

    2024-06-10 22:10:04       20 阅读
  4. <span style='color:red;'>FlinkSQL</span>

    FlinkSQL

    2024-06-10 22:10:04      53 阅读
  5. flinksql

    2024-06-10 22:10:04       34 阅读
  6. FlinkSQL的联结函数

    2024-06-10 22:10:04       63 阅读
  7. Memcached介绍详解

    2024-06-10 22:10:04       35 阅读
  8. Memcached介绍详解

    2024-06-10 22:10:04       26 阅读
  9. Memcached介绍详解

    2024-06-10 22:10:04       23 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-10 22:10:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-10 22:10:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-10 22:10:04       82 阅读
  4. Python语言-面向对象

    2024-06-10 22:10:04       91 阅读

热门阅读

  1. 【C++重载——<<&>>&==&[]&=……】

    2024-06-10 22:10:04       30 阅读
  2. 程序设计与算法(三)C++:第四章poj代码

    2024-06-10 22:10:04       26 阅读
  3. selenium的使用教程

    2024-06-10 22:10:04       34 阅读
  4. 编译与链接

    2024-06-10 22:10:04       32 阅读
  5. Vue 路由实现组件切换

    2024-06-10 22:10:04       29 阅读
  6. C++设计模式---工厂模式

    2024-06-10 22:10:04       23 阅读
  7. 使用Spring Boot设计对象存储系统

    2024-06-10 22:10:04       30 阅读