flink: 自定义表函数的用法

package cn.edu.tju.demo3;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class Test49 {
    private static String HOST_NAME = "xx.xx.xx.xx";
    private static int PORT = 9999;
    private static String DELIMITER ="\n";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);
        SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {
            @Override
            public DataInfo map(String value) throws Exception {

                String[] stringList = value.split(",");
                DataInfo dataInfo = new DataInfo(Long.parseLong(
                        stringList[0]), stringList[1], Double.parseDouble(stringList[2]));
                return dataInfo;
            }
        });

        Table dataTable = tableEnv.fromDataStream(dataInfoStream,"ts,info,val");
        tableEnv.registerFunction("myTableFunction", new MyTableFunction());
        Table resultTable = dataTable.select("ts,info,val")
                .joinLateral("myTableFunction(val) as(a,b)")
                        .select("ts,info,a,b");

        tableEnv.createTemporaryView("dataInfo", dataTable);

        Table resultTableSql = tableEnv.sqlQuery(
                "select ts,info,val,a,b from dataInfo,LATERAL TABLE(myTableFunction(val)) as res (a,b)"

        );
        

        tableEnv.toAppendStream(resultTable, Row.class).print();
        tableEnv.toAppendStream(resultTableSql, Row.class).print("sql");

        env.execute("my job");

    }

    public static class DataInfo{
        private long ts;
        private String info;
        private double val;

        public long getTs() {
            return ts;
        }

        public void setTs(long ts) {
            this.ts = ts;
        }

        public String getInfo() {
            return info;
        }

        public void setInfo(String info) {
            this.info = info;
        }

        public double getVal() {
            return val;
        }

        public void setVal(double val) {
            this.val = val;
        }

        @Override
        public String toString() {
            return "DataInfo{" +
                    "ts=" + ts +
                    ", info='" + info + '\'' +
                    ", val='" + val + '\'' +
                    '}';
        }

        public DataInfo(long ts, String info, double val) {
            this.ts = ts;
            this.info = info;
            this.val = val;
        }

        public DataInfo() {

        }
    }

    //自定义表函数,必须定义一个public的 名字为eval方法
    public static class MyTableFunction extends TableFunction<Tuple2<Double, Double>>{
        public void eval(double d){
            collect(new Tuple2<>(d, Math.floor(d)));
        }
    }
}

nc -lk 9999
输入:

1690000001,ffff,87.12

执行结果
在这里插入图片描述

相关推荐

  1. 定义指令基本

    2024-03-16 16:04:02       12 阅读
  2. flink定义函数如何从崩溃中恢复数据

    2024-03-16 16:04:02       14 阅读
  3. Flink SQL 定义函数 - 字符串拆分

    2024-03-16 16:04:02       13 阅读
  4. Flink中ProcessFunction

    2024-03-16 16:04:02       35 阅读
  5. 定义函数使用

    2024-03-16 16:04:02       15 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-16 16:04:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-16 16:04:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-16 16:04:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-16 16:04:02       18 阅读

热门阅读

  1. 交叉编译代码

    2024-03-16 16:04:02       20 阅读
  2. ChatGPT升级版:如何借助ChatGPT高效撰写学术论文

    2024-03-16 16:04:02       23 阅读
  3. windows各个版本安装SSH

    2024-03-16 16:04:02       23 阅读
  4. CMake官方教程8--自定义命令和生成文件

    2024-03-16 16:04:02       19 阅读
  5. C#面:throw 和throw ex 的区别

    2024-03-16 16:04:02       18 阅读
  6. OpenXR 超详细spec--Chapter 2 基本原理

    2024-03-16 16:04:02       20 阅读
  7. 01、CMD与Hello world

    2024-03-16 16:04:02       18 阅读
  8. leetcode112.路径总和

    2024-03-16 16:04:02       19 阅读
  9. react面试题总结

    2024-03-16 16:04:02       18 阅读
  10. c语言:一颗红心两手准备(scanf函数的返回值)

    2024-03-16 16:04:02       18 阅读