flink 最后一个窗口一直没有新数据,窗口不关闭问题

flink 最后一个窗口一直没有新数据,窗口不关闭问题

自定义实现 WatermarkStrategy接口

代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{
   

        private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);

        @Override
        public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
   
            return new WatermarkGenerator<JSONObject>() {
   
                private long maxWatermark;

                @Override
                public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
   
                    maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
                    state.f0 = System.currentTimeMillis();
                    System.out.println("maxWatermark is " + maxWatermark);
                    state.f1 = false;
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
   
                    //乱序时间
                    long outOfTime = 3000L;
                    if (maxWatermark - outOfTime <=0){
   
                    } else {
   
                        // 10s内没有数据则关闭当前窗口
                        System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
                        System.out.println("state.f1:" + state.f1);
                        if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
   
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));
                            state.f1 = true;
                            System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));
                        } else {
   
                            System.out.println("正常发送水印");
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
                        }
                    }
                }
            };
        }
    }

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
参考:https://blog.csdn.net/lr131425/article/details/127422833

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-18 06:58:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-18 06:58:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-18 06:58:03       82 阅读
  4. Python语言-面向对象

    2024-01-18 06:58:03       91 阅读

热门阅读

  1. GO基础进阶篇 (十三)、泛型

    2024-01-18 06:58:03       52 阅读
  2. 服务器租用和托管有哪些注意事项?

    2024-01-18 06:58:03       49 阅读
  3. 基于博弈树的开源五子棋AI教程[7 多线程搜索]

    2024-01-18 06:58:03       41 阅读
  4. npm换源

    npm换源

    2024-01-18 06:58:03      52 阅读
  5. 【Spring Boot 3】【Redis】集成Jedis

    2024-01-18 06:58:03       44 阅读
  6. npm-yarn

    2024-01-18 06:58:03       56 阅读
  7. 国内环境 GitHub 拉取仓库速度慢的缓解方案

    2024-01-18 06:58:03       59 阅读