1. 添加依赖包
这里使用的版本时1.14.6,scala版本是2.12.
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.14</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
2. 包装工具类
注意检查依赖的包
import org.apache.iceberg.{Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.hive.HiveCatalog
object IcebergUtil {
/**
* 加载catalogLoader,使用多个表时,只需要加载一次
* @return
*/
def hiveCatalogLoader():(HiveCatalog,CatalogLoader) = {
val catalog = new HiveCatalog()
val hiveProp = new java.util.HashMap[String, String]()
hiveProp.put("warehouse", "hdfs://ns1/user/hive/warehouse")
hiveProp.put("uri", "thrift://192.168.0.100:9083,thrift://192.168.0.101:9083")
catalog.initialize("hive", hiveProp)
val catalogLoader = CatalogLoader.hive("hive",new org.apache.hadoop.conf.Configuration(), hiveProp)
(catalog,catalogLoader)
}
def tableLoad(catalog:Catalog, catalogLoader:CatalogLoader,dbName:String, tableName:String):(Table,TableLoader) = {
val tableIdentifier = TableIdentifier.of(dbName, tableName)
val table = catalog.loadTable(tableIdentifier)
val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)
(table,tableLoader)
}
}
3. 读取数据
这里设置streaming(false) 将按照批次读取。
private def getStream(iceberg: String)(env: StreamExecutionEnvironment): datastream.DataStream[RowData] = {
val (catalog,hiveCatalogLoader) = IcebergUtil.hiveCatalogLoader()
val (table, tableLoader) = IcebergUtil.tableLoad(catalog, hiveCatalogLoader, "iceberg_dw", iceberg)
FlinkSource.forRowData().env(env)
.table(table)
.tableLoader(tableLoader).streaming(false).build()
}