Flink读取iceberg表

1. 添加依赖包

这里使用的版本时1.14.6,scala版本是2.12.

		<dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.14</artifactId>
        </dependency>
 		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

2. 包装工具类

注意检查依赖的包

import org.apache.iceberg.{Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.hive.HiveCatalog


object IcebergUtil {

  /**
   * 加载catalogLoader,使用多个表时,只需要加载一次
   * @return
   */
  def hiveCatalogLoader():(HiveCatalog,CatalogLoader) = {
    val catalog = new HiveCatalog()
    val hiveProp = new java.util.HashMap[String, String]()
    hiveProp.put("warehouse", "hdfs://ns1/user/hive/warehouse")
    hiveProp.put("uri", "thrift://192.168.0.100:9083,thrift://192.168.0.101:9083")
    catalog.initialize("hive", hiveProp)
    val catalogLoader = CatalogLoader.hive("hive",new org.apache.hadoop.conf.Configuration(), hiveProp)
    (catalog,catalogLoader)
  }


  def tableLoad(catalog:Catalog, catalogLoader:CatalogLoader,dbName:String, tableName:String):(Table,TableLoader) = {
    val tableIdentifier = TableIdentifier.of(dbName, tableName)
    val table = catalog.loadTable(tableIdentifier)
    val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)
    (table,tableLoader)
  }

}

3. 读取数据

这里设置streaming(false) 将按照批次读取。

  private def getStream(iceberg: String)(env: StreamExecutionEnvironment): datastream.DataStream[RowData] = {
    val (catalog,hiveCatalogLoader) = IcebergUtil.hiveCatalogLoader()
    val (table, tableLoader) = IcebergUtil.tableLoad(catalog, hiveCatalogLoader, "iceberg_dw", iceberg)
    FlinkSource.forRowData().env(env)
      .table(table)
      .tableLoader(tableLoader).streaming(false).build()
  }

相关推荐

  1. Flink读取iceberg

    2024-03-15 23:58:02       41 阅读
  2. Iceberg: 列式读取Parquet数据

    2024-03-15 23:58:02       72 阅读
  3. FlinkFlink实时读取mysql数据

    2024-03-15 23:58:02       51 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-15 23:58:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-15 23:58:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-15 23:58:02       82 阅读
  4. Python语言-面向对象

    2024-03-15 23:58:02       91 阅读

热门阅读

  1. 软件工程师,是时候了解下Rust编程语言了

    2024-03-15 23:58:02       44 阅读
  2. jsp中el表达式带有英文单双引号

    2024-03-15 23:58:02       43 阅读
  3. <个人笔记>数论

    2024-03-15 23:58:02       45 阅读
  4. C语言练习作业5

    2024-03-15 23:58:02       41 阅读
  5. LeetCode 热题 HOT 100(P11~P20)

    2024-03-15 23:58:02       35 阅读
  6. Oracle数据库连接方式

    2024-03-15 23:58:02       41 阅读
  7. 如何区分 数据库系统 和 数据库管理系统 ?

    2024-03-15 23:58:02       41 阅读
  8. oppo前端开发一面

    2024-03-15 23:58:02       35 阅读