【大数据面试题】005 谈一谈 Flink Watermark 水印

一步一个脚印,一天一道面试题。

感觉我现在很难把水印描述的很好,但,完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。

在实时处理任务时,由于网络延迟,人工异常,各种问题,数据往往会出现乱序,不按照我们的预期到达处理框架。
WaterMark 水印,就是为了一定程度的解决数据,延迟乱序问题的。

使用 WaterMark 一般有以下几个步骤:

  • 定义时间特性
    (Flink 1.12 已废弃,默认使用 事件时间)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 设置 Watermark 策略,赋值事件时间
        // 分配时间戳和水位线
        DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, recordTimestamp) -> element.f0));

话不多说,直接给个 Watermark 水印样例代码。


public class SimpleWatermarkExample {
   
    public static void main(String[] args) throws Exception {
   
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 socket 文本流接收数据
        DataStream<String> input = env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1));

        // 解析输入的数据
        DataStream<Tuple2<Long, Integer>> parsedStream = input
                .map(new MapFunction<String, Tuple2<Long, Integer>>() {
   
                    @Override
                    public Tuple2<Long, Integer> map(String value) throws Exception {
   
                        String[] parts = value.split(",");
                        return new Tuple2<>(Long.parseLong(parts[0]), Integer.parseInt(parts[1]));
                    }
                });

        // 分配时间戳和水位线
        DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, recordTimestamp) -> element.f0));

        // 使用窗口函数统计每10秒内的最大值
        DataStream<String> maxValues = withTimestampsAndWatermarks
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new WindowFunction<Tuple2<Long, Integer>, String, TimeWindow>() {
   
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple2<Long, Integer>> values, Collector<String> out) throws Exception {
   
                        int maxValue = Integer.MIN_VALUE;
                        for (Tuple2<Long, Integer> value : values) {
   
                            maxValue = Math.max(maxValue, value.f1);
                        }
                        out.collect("Window: " + window + " Max Value: " + maxValue);
                    }
                });

        // 打印结果
        maxValues.print();

        // 执行程序
        env.execute("Simple Flink Watermark Example");
    }
}

相关推荐

  1. 数据面试005 Flink Watermark 水印

    2024-02-08 07:24:01       50 阅读
  2. 数据面试007 Flink 背压

    2024-02-08 07:24:01       50 阅读
  3. 数据面试008 Flink Slot 与 并行度

    2024-02-08 07:24:01       44 阅读
  4. [AIGC 数据基础]hive浅

    2024-02-08 07:24:01       53 阅读
  5. 003、浅Neo4j的数据模型

    2024-02-08 07:24:01       30 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-08 07:24:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-08 07:24:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-02-08 07:24:01       82 阅读
  4. Python语言-面向对象

    2024-02-08 07:24:01       91 阅读

热门阅读

  1. FolkMQ “单线程“消息中间件(开源) v1.0.32 发布

    2024-02-08 07:24:01       52 阅读
  2. [AIGC] 开源流程引擎哪个好,如何选型?

    2024-02-08 07:24:01       46 阅读
  3. 1.2 Verilog 简介及发展历史

    2024-02-08 07:24:01       62 阅读
  4. visual studio注册码

    2024-02-08 07:24:01       54 阅读
  5. pydantic了解学习

    2024-02-08 07:24:01       48 阅读
  6. ThreadLocal在项目中的简单使用

    2024-02-08 07:24:01       55 阅读
  7. Cpp-3

    2024-02-08 07:24:01       54 阅读
  8. 贪心算法之找零钱

    2024-02-08 07:24:01       58 阅读