flink分别使用FilterMap和ProcessFunction实现去重逻辑

背景

在日常的工作中,对数据去重是一件很常见的操作,比如我们只需要保留重复记录的第一条,而忽略掉后续重复的记录,达到去重的效果,本文就使用flink的FilterMap和ProcessFunction来实现去重逻辑

FilterMap和ProcessFunction去重实现

filterMap实现去重

public class DuplicateRichFlatMap extends RichFlatMapFunction<WikipediaEditEvent, WikipediaEditEvent> {
   

    ValueState<Boolean> duplicateInput;

    @Override
    public void open(Configuration parameters) throws Exception {
   
        duplicateInput = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("duplicate", Types.BOOLEAN));
    }

    @Override
    public void flatMap(WikipediaEditEvent in, Collector<WikipediaEditEvent> collector) throws Exception {
   
        if (duplicateInput.value() == null) {
   
            collector.collect(in);
            duplicateInput.update(true);
        }
    }

}

这里实现的关键就是有一个key-value的flink状态

ProcessFunction去重

public class DupliacateProcessFunction extends KeyedProcessFunction<String, WikipediaEditEvent, WikipediaEditEvent> {
   


    ValueState<Boolean> duplicateInput;

    @Override
    public void open(Configuration parameters) throws Exception {
   
        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<Boolean>("previousInput", Types.BOOLEAN);
        // 状态ttl超时时间设置
        StateTtlConfig ttlConfig =
                StateTtlConfig.newBuilder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        // check 10 keys for every state access
                        .cleanupIncrementally(100, false).build();
        stateDescriptor.enableTimeToLive(ttlConfig);
        duplicateInput = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(WikipediaEditEvent in, Context context, Collector<WikipediaEditEvent> collector)
            throws Exception {
   
        if (duplicateInput.value() == null) {
   
            collector.collect(in);
            duplicateInput.update(true);
        }
    }
}

这里的关键代码也是拥有一个key-value的状态

触发计算的job代码如下

public class DuplicateJob {
   

    public static void main(String[] args) throws Exception {
   

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStream<WikipediaEditEvent> edits = see.addSource(new RandomStringSource());

        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() {
   
            @Override
            public String getKey(WikipediaEditEvent event) {
   
                return event.getUser();
            }
        });

        // 通过RichFlatMap实现去重
        DataStream<WikipediaEditEvent> result = keyedEdits.flatMap(new DuplicateRichFlatMap());
        // 通过ProcessFunction实现去重
//        DataStream<WikipediaEditEvent> result = keyedEdits.process(new DupliacateProcessFunction());

        result.print();
        see.execute();

    }
}

相关推荐

  1. flink分别使用FilterMapProcessFunction实现逻辑

    2024-01-31 06:50:02       56 阅读
  2. Flink计数统计用户数

    2024-01-31 06:50:02       70 阅读
  3. FlinkProcessFunction的用法

    2024-01-31 06:50:02       56 阅读
  4. 实现数组的方式

    2024-01-31 06:50:02       54 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-31 06:50:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-31 06:50:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-01-31 06:50:02       87 阅读
  4. Python语言-面向对象

    2024-01-31 06:50:02       96 阅读

热门阅读

  1. 双非本科准备秋招(11.2)—— 力扣字符串

    2024-01-31 06:50:02       62 阅读
  2. 设计模式七(策略模式)

    2024-01-31 06:50:02       63 阅读
  3. 深入理解c语言printf

    2024-01-31 06:50:02       52 阅读
  4. 分布式场景怎么Join

    2024-01-31 06:50:02       58 阅读
  5. 【架构即未来】构建故障隔离的架构

    2024-01-31 06:50:02       68 阅读
  6. AI场景下存储架构有哪些?

    2024-01-31 06:50:02       56 阅读
  7. DVWA csrf代码分析

    2024-01-31 06:50:02       49 阅读
  8. FPS游戏框架漫谈第五天

    2024-01-31 06:50:02       62 阅读
  9. Unity游戏开发面试知识点全解读

    2024-01-31 06:50:02       53 阅读