一、表转换为 DataStream
public class TableTransformDataStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new FileSystem().path("./sensor.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE()))
.createTemporaryTable("sensor");
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable.select("id, temperature").where("id = 'sensor_1'");
Table aggTable = sensorTable.groupBy("id").select("id, id.count as cnt, temperature.avg as avgTemp");
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
DataStream<Tuple2<Boolean, Row>> aggStream = tableEnv.toRetractStream(aggTable, Row.class);
resultStream.print("result");
aggStream.print("agg");
env.execute();
}
}
二、DataStream 转换为表
public class DataStreamTransformTable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<String> inputStream = env.readTextFile("./sensor.txt");
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
Table table = tableEnv.fromDataStream(dataStream);
env.execute();
}
}
三、查看执行计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);