Flink创建TableEnvironment

在官网上,Flink创建TableEnvironment有两种方式:1.通过静态方法 TableEnvironment.create() 创建;2.从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


//1.通过静态方法 TableEnvironment.create() 创建
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()//默认为StreamingMode。可以不写
    //.inBatchMode()
    //.useOldPlanner()//1.14.4已过时
    //.useBlinkPlanner()//1.14.4已过时。默认BLINK
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

//2.从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

而在写代码时会发现,通过静态方法 TableEnvironment.create() 创建时,create()方法有两种参数,一种就是官网上的EnvironmentSettings,另一种是Configuration。

查看源码会发现EnvironmentSettings最终还是会将EnvironmentSettings转为Configuration。且可以看到默认的RUNTIME_MODE是StreamingMode。

TableEnvironment.create(settings)-->TableEnvironmentImpl.create(settings)-->create(settings, settings.toConfiguration())
    /** Convert the environment setting to the {@link Configuration}. */
    public Configuration toConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH);
        configuration.set(TABLE_PLANNER, PlannerType.BLINK);
        return configuration;
    }

同样,看EnvironmentSettings的构造方法也可以发现默认的RUNTIME_MODE是StreamingMode。

    private EnvironmentSettings(
            String planner,
            @Nullable String executor,
            String builtInCatalogName,
            String builtInDatabaseName,
            boolean isStreamingMode) {
        this.planner = planner;
        this.executor = executor;
        this.builtInCatalogName = builtInCatalogName;
        this.builtInDatabaseName = builtInDatabaseName;
        this.isStreamingMode = isStreamingMode;
    }

此外,我看的是1.14.4的源码,发现默认的planner已经是BLINK,而OLD未来将不会保留。

/**
 * Determine the type of the {@link Planner}. Except for the optimization, the different planner
 * also differs in the time semantic and so on.
 *
 * @deprecated The old planner has been removed in Flink 1.14. Since there is only one planner left
 *     (previously called the 'blink' planner), this class is obsolete and will be removed in future
 *     versions.
 */
@PublicEvolving
@Deprecated
public enum PlannerType {
    /** Blink planner is the up-to-date planner in Flink. */
    BLINK,

    /** Old planner is used before. It will not be maintained in the future. */
    OLD
}

相关推荐

  1. Flink创建TableEnvironment

    2024-03-12 06:46:01       37 阅读
  2. flink创建表报错

    2024-03-12 06:46:01       55 阅读
  3. flink on k8s几种创建方式

    2024-03-12 06:46:01       55 阅读
  4. <span style='color:red;'>Flink</span>

    Flink

    2024-03-12 06:46:01      55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-12 06:46:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-12 06:46:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-12 06:46:01       82 阅读
  4. Python语言-面向对象

    2024-03-12 06:46:01       91 阅读

热门阅读

  1. 【IVA】什么是IVA?

    2024-03-12 06:46:01       49 阅读
  2. 块级作用域、变量提升

    2024-03-12 06:46:01       39 阅读
  3. 列表循环多个el-form-item并校验

    2024-03-12 06:46:01       41 阅读
  4. PYTHON 120道题目详解(100-102)

    2024-03-12 06:46:01       47 阅读
  5. Golang-如何优雅的关闭一个Channel?

    2024-03-12 06:46:01       39 阅读
  6. Windows版Redis启用密码

    2024-03-12 06:46:01       38 阅读
  7. 正则表达式笔记+demo

    2024-03-12 06:46:01       45 阅读
  8. Leetcode 第388场周赛 问题和解法

    2024-03-12 06:46:01       45 阅读