FlinkSQL的联结和函数

联结查询

常规联结

  • Inner Join: 结果集取交集
  • Outer Join
    • left outer join: 左表的数据全取,右表的数据取与左表匹配的数据
    • right outer join:右表的数据全取,左表的数据取与右表匹配的数据
    • full outer join:左表和右表都取匹配的数据
  • 常规联结,会将流中的数据一直保留在状态中,需要考虑状态的清除,可以使用TTL
    • tableEnv.getConfig().getConfiguration().setLong(“table.exec.state.ttl”, 10000);

间隔联结 interval join

  • 以一条流中数据的时间为基准,
select
	t1.id,
	t1.vc,
	t2.vc
from t1, t2
where t1.id = t2.id

维表联结 lookUp join

一条流与外部的一张表(维度表)进行联结。

select 
	t1.id,
	t1.vc,
	t2.vc
from t1
join t2 for system_time as of t1.pt
	on t1.id = t2.id;

函数

  1. 系统函数
    • API方式:table(call(“UPPER”, $(“id”))).execute().print();
    • SQL方式:tableEnv.sqlQuery(“select UPPER(id), vc, ts from t1”);
  2. 自定义函数
    • Scalar Function: UDF 一进一出
      • 继承 ScalarFunction类,实现一个或多个eval()
      • 注册函数:TableEnv.createTemporyFunction(“MyUpper”, MyUpperFunction.class);
    • Table function: UDTF, 一进多出
      • 继承TableFunction类,有泛型,表示输出类型。flink中二元组建议使用Row类型
      • 实现一个或多个eval方法
      • 收集数据时直接使用collect()方法
      • 由于Row类型无法确定元素的个数,需要添加注解@FunctionHint(output=@DataTypeHint("ROW<word string, len int>"))
      • 输出结果需要使用侧写:left join lateral table(mySplit(id)) on true;
    • Aggregate function : UDAF, 多进一出
      • 写个类继承AggregateFunction<OUT, Tuple2<Integer, Integer>输入类型>
      • 实现3个方法:getValue(), createAccumulator(), accumulate()
      • 注册方法:tableEnv.createTemporyFunction()
    • Table aggregate Function: 多进多出 ,UDTAF
      • 写个类继承TableAggregateFunction<OUT, IN>
      • 实现3个方法:createAccumulator(), accumulate()
      • 该类型的方法无法使用SQL的方式来调用,SQL中没有该类型的方法,只能使用API的方式
      • .flatAggregate(call("myTop2"), $ ("vc") )

CataLog目录

  1. 默认库 tableEnv.getCurrentDatabase() => default_database
  2. 默认目录 tableEnv.getCurrentCataLog() => default_catalog, 名字为GenericInMemeryCatalog
  3. 目录包含库,库包含表,方便后期管理。
  4. 可以读取外部数据库的表,可以查询外部数据库的表
  5. JdbcCataLog不支持创建外部数据表
  6. HiveCatalog
    • 需要下载hive-set.xml文件到本地项目中
    • 启动metastore服务:nohup metastore --service &
    • 设置登录用户 setProperties("HADOOP_USER_NAME", "atguigu");
    • Flink只是将元数据存储到了Hive中

Module操作

Flink支持引入Hive中的优秀函数,比如Split(‘aa,bb,cc,dd’, ‘,’).

  • 创建 HiveModule hiveModule = new HiveModule();
  • 引入:tableEnv.loadModule(“hive”, hiveModule);
  • 查看Flink中Module: tableEnv.listModules(); 如果出现同名的函数,会按照当前module的顺序来使用
    • 调换顺序:tableEnv.useModules(“hive”, “core”);

SQL-Client

  1. 先启动Flink集群,独立部署模式或者Yarn模式
  2. bin/sql_client.sh
  3. 建立对应的表
  4. 查询表中数据
    • 显示模式
      • set ‘sql-client.execution.result-mode’ = ‘table’
      • set ‘sql-client.execution.result-mode’ = ‘tableau’
      • set ‘sql-client.execution.result-mode’ = ‘changelog’

相关推荐

  1. FlinkSQL联结函数

    2023-12-17 08:30:02       40 阅读
  2. <span style='color:red;'>FlinkSQL</span>

    FlinkSQL

    2023-12-17 08:30:02      30 阅读
  3. flinksql

    2023-12-17 08:30:02       12 阅读
  4. FlinkSQL State生命周期

    2023-12-17 08:30:02       11 阅读
  5. 【Flink】FlinkSQLDataGen连接器(测试利器)

    2023-12-17 08:30:02       30 阅读
  6. FlinkSql一个简单测试程序

    2023-12-17 08:30:02       25 阅读
  7. 边缘计算联邦学习联系

    2023-12-17 08:30:02       31 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-17 08:30:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-17 08:30:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-17 08:30:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-17 08:30:02       18 阅读

热门阅读

  1. 凑够五个字

    2023-12-17 08:30:02       43 阅读
  2. uniapp 消息队列 localstorage 消息队列用法

    2023-12-17 08:30:02       39 阅读
  3. Qt绘图控件的实现

    2023-12-17 08:30:02       36 阅读
  4. nlohmann-json使用

    2023-12-17 08:30:02       48 阅读
  5. 深拷贝和浅拷贝(js的问题)

    2023-12-17 08:30:02       41 阅读
  6. 复盘理解/实验报告梳理 数据结构PTA实验二

    2023-12-17 08:30:02       43 阅读
  7. [Django-05 ]自定义sql查询

    2023-12-17 08:30:02       36 阅读
  8. 微信小程序怎样给事件传值的

    2023-12-17 08:30:02       36 阅读
  9. huggingface使用与环境移植

    2023-12-17 08:30:02       39 阅读
  10. ubuntu22.04 怎么查看系统日志

    2023-12-17 08:30:02       35 阅读
  11. 敏捷开发-任务拆解、工作量评估和任务指派

    2023-12-17 08:30:02       35 阅读
  12. 什么是CI/CD?如何在PHP项目中实施CI/CD?

    2023-12-17 08:30:02       38 阅读