flink: table api之窗口函数的用法

package cn.edu.tju.demo2;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;

public class Test42 {
    private static String HOST_NAME = "xx.xx.xx.xx";
    private static int PORT = 9999;
    private static String DELIMITER ="\n";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);
        SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {
            @Override
            public DataInfo map(String value) throws Exception {

                String[] stringList = value.split(",");
                DataInfo dataInfo = new DataInfo(Long.parseLong(
                        stringList[0]), stringList[1], Double.parseDouble(stringList[2]));
                return dataInfo;
            }
        });



        Table dataTable = tableEnv.fromDataStream(dataInfoStream,"ts,info,val,pt.proctime");
        //10.rows
        Table resultTable = dataTable.window(Tumble.over("30.seconds").on("pt").as("w"))
                .groupBy("w,info").select("info, val.avg, w.end");

        tableEnv.toAppendStream(resultTable, Row.class).print("result");



        env.execute("my job");

    }

    public static class DataInfo{
        private long ts;
        private String info;
        private double val;

        public long getTs() {
            return ts;
        }

        public void setTs(long ts) {
            this.ts = ts;
        }

        public String getInfo() {
            return info;
        }

        public void setInfo(String info) {
            this.info = info;
        }

        public double getVal() {
            return val;
        }

        public void setVal(double val) {
            this.val = val;
        }

        @Override
        public String toString() {
            return "DataInfo{" +
                    "ts=" + ts +
                    ", info='" + info + '\'' +
                    ", val='" + val + '\'' +
                    '}';
        }

        public DataInfo(long ts, String info, double val) {
            this.ts = ts;
            this.info = info;
            this.val = val;
        }

        public DataInfo() {

        }
    }
}

nc -lk 9999
输入:

1689999832,ff,35
1689999833,ff,38

得到结果(稍等30秒)
在这里插入图片描述

相关推荐

  1. selenium中switch_to.window切换窗口

    2024-03-13 15:14:04       36 阅读
  2. ExcelVLOOKUP函数

    2024-03-13 15:14:04       27 阅读
  3. MATLAB中cell函数

    2024-03-13 15:14:04       63 阅读
  4. Excel中INDIRECT函数

    2024-03-13 15:14:04       63 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-13 15:14:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-13 15:14:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-13 15:14:04       82 阅读
  4. Python语言-面向对象

    2024-03-13 15:14:04       91 阅读

热门阅读

  1. 如何配置极狐GitLab Runner Cache 缓存

    2024-03-13 15:14:04       43 阅读
  2. 如何排查 IMKit 用户头像无法加载问题

    2024-03-13 15:14:04       45 阅读
  3. 【云原生】关于解耦和平台化的一些思考

    2024-03-13 15:14:04       41 阅读
  4. 手机天猫等级怎么查

    2024-03-13 15:14:04       41 阅读
  5. Redis 中的字符串数据结构详解及字符串命令

    2024-03-13 15:14:04       42 阅读
  6. 编写Linux的SHELL脚本设置环境变量遇到的那些坑

    2024-03-13 15:14:04       40 阅读
  7. Stable Diffusion如何生成高质量的图-prompt写法介绍

    2024-03-13 15:14:04       40 阅读
  8. LeetCode刷题--- 摆动序列

    2024-03-13 15:14:04       45 阅读
  9. 人事面试提问技巧全攻略

    2024-03-13 15:14:04       42 阅读
  10. TCP并发模型 || select || poll || epoll

    2024-03-13 15:14:04       40 阅读