flink table view datastream互转

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convert

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataType


object streamPOJO2table {
  case class outer(f1:String,f2:Inner)
  case class outerV1(f1:String,f2:Inner,f3:Int)
  case class Inner(f3:String,f4:Int)

  def main(args: Array[String]): Unit = {
    // flink1.13 流处理环境初始化
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)
    import org.apache.flink.streaming.api.scala._
    val ds1: DataStream[outer] = env.fromElements(
      outer("a",Inner("b",2))
      ,outer("d",Inner("e",4))
    )


    val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()
    /*
    +----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+
     */



//    table1
//      .print()
    /*
    5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]
     */


    tEnv.createTemporaryView("view1", ds1)

    val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")
    tableResult1.print()
    /*
    +----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
     */


//
    val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()

//    println(t1.getResolvedSchema)
    /*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(
  `f1` STRING,
  `f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,
  `f3` INT NOT NULL
)
     */

    println("---- 1 -------")
    // tableResult转datastream
    val o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()

    println("---- 2 -------")

    tEnv.executeSql(
      """
        |select
        |f1
        |,f2.f3
        |,f2.f4
        |from view1
        |""".stripMargin)
//      .print()
    /*
    +----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+
     */

    tEnv.executeSql(
        """
          |select
          |f1
          |,(f2.f3,f2.f4)
          |from view1
          |""".stripMargin)
//      .print()


    env.execute("jobName1")
  }

}

相关推荐

  1. Date与LocalDate

    2024-01-05 10:10:10       35 阅读
  2. QT : Bson\Json

    2024-01-05 10:10:10       43 阅读
  3. 【maskjson】文件

    2024-01-05 10:10:10       53 阅读
  4. 【MATLAB】 RGB和YCbCr

    2024-01-05 10:10:10       40 阅读
  5. vue json字符串和Hex

    2024-01-05 10:10:10       14 阅读
  6. python字节串和数字

    2024-01-05 10:10:10       12 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-05 10:10:10       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-05 10:10:10       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-05 10:10:10       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-05 10:10:10       20 阅读

热门阅读

  1. flink如何写入es

    2024-01-05 10:10:10       42 阅读
  2. 【Linux】不常用命令记录

    2024-01-05 10:10:10       40 阅读
  3. Openharmony hdc和adb指令对应

    2024-01-05 10:10:10       41 阅读
  4. K-均值聚类(K-means clustering)

    2024-01-05 10:10:10       35 阅读
  5. 腾讯云导入导出镜像官方文档

    2024-01-05 10:10:10       44 阅读
  6. (每日持续更新)jdk api之BufferedReader基础

    2024-01-05 10:10:10       39 阅读
  7. 云原生系列Go语言篇-标准库 Part 1

    2024-01-05 10:10:10       33 阅读
  8. OpenCV 安装概述

    2024-01-05 10:10:10       34 阅读