8、Flink 在 source 处生成水位线 和 在 source 之后生成水位线案例

1、AtSourceGenerateWatermark
注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。

import java.time.Duration;

public class _02_AtSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("my-broker")
                .setTopics("my-topic")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> source = env.fromSource(kafkaSource
                , WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                , "kafka_source"
                , TypeInformation.of(new TypeHint<String>() {
                }));

        source.print();

        env.execute();
    }
}

2、在 source 之后生成水位线

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class _03_AfterSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {
            @Override
            public _01_MyEvent map(String value) throws Exception {
                String[] fields = value.split(",");
                return new _01_MyEvent(Integer.parseInt(fields[0]),
                        fields[1],
                        Long.parseLong(fields[2]));
            }
        });

        SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {
                    @Override
                    public long extractTimestamp(_01_MyEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                }));

        timestampsAndWatermarks.print();

        env.execute();
    }
}

相关推荐

  1. [AIGC] 深入理解Flink中的窗口、水位线定时器

    2024-04-30 09:16:03       51 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-30 09:16:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-30 09:16:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-04-30 09:16:03       82 阅读
  4. Python语言-面向对象

    2024-04-30 09:16:03       91 阅读

热门阅读

  1. leetcode第三题

    2024-04-30 09:16:03       29 阅读
  2. docker学习笔记7:centos docker安装mysql

    2024-04-30 09:16:03       31 阅读
  3. 使用 Docker 在 PyTorch 环境中训练模型

    2024-04-30 09:16:03       35 阅读
  4. 电路邱关源学习笔记——3.1电路的图

    2024-04-30 09:16:03       28 阅读
  5. 排序 + 模拟,LeetCode 1329. 将矩阵按对角线排序

    2024-04-30 09:16:03       38 阅读