窗口的聚合和水位线的传递

水位线的传递

水位线传递原则:
1. 多个上游,同时给1个下游传递水位线,下游取哪个水位线:取最小的
2. 1个上游,同时给多个下游传递水位线,如何传:采用广播的方式

在这里插入图片描述

窗口

一种将无限数据切割成有限的数据块进行处理的方式。根据如何截取数据,分为时间窗口和计数窗口。如果数据做了keyby之后,对应key的数据间的窗口是隔离的。

按照窗口分配数据的规则分类:

  1. 滚动窗口:窗口是首尾相连的,窗口之间没有重叠,也不会有间隔
  2. 滑动窗口:窗口会出现重叠,数据有可能被同时分配到多个窗口中,滑动窗口的计算频率是由滑动步长决定的。
  3. 会话窗口:通过数据之间的gap来划分窗口,如果两条数据之间的间隔没有超过设定的gap距离,则划分到一个窗口中。否则重新开一个窗口。
  4. 全局窗口:所有数据都往一个窗口中放,窗口永远不会关闭,一般不会去用。

API的调用

  1. 窗口分配器
    • 非keyby分区:所有数据不会进行Key的隔离,都往同一个窗口中放
    • keyby分区:数据会按照key进行隔离,会为每个key分配对应的窗口
  2. 窗口函数计算
    • 基本增量聚合函数:sum,min, max, maxby, minby

    • 增量聚合函数:通过窗口分配器分配好窗口后,窗口中每来一条数据都要执行一次聚合处理, 等到触发窗口计算时,将聚合的结果输出

      • reduce:两两聚合:输入输出数据类型一致。窗口中的第一个数据不执行聚合
      • aggregate:两两聚合:第一个数据不执行聚合,输入类型和输出类型不一致
    • 全窗口函数(全量聚合):
      1. 将数据收集齐后,统一计算输出结果
      2. 获取窗口信息,窗口开始时间,结束时间
      3. 泛型:输入类型,输出类型,key的类型,窗口的类型
      4. process(key,上下文,数据集合,采集器)

public class Flink08_AggregateFunction {
   
    public static void main(String[] args) {
   
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> ds = Flink06_EventSource.getEventSource(env)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                .withTimestampAssigner(
                                        (event, ts) -> event.getTs()
                                )
                );

        ds.print("input");

        //统计每10秒内,每个用户的UV,独立访问数,即10秒内访客的个数
       ds.windowAll(
               TumblingEventTimeWindows.of(Time.seconds(10))
       ).aggregate(
               new AggregateFunction<Event, Set<String>, Integer>() {
   
                   /**
                    * 创建累加器对象,该方法只会被调用一次
                    * @return
                    */
                   @Override
                   public Set<String> createAccumulator() {
   
                       return new HashSet<String>();
                   }

                   /**
                    * 累加过程,每来一条数据就调用一次
                    * @param value The value to add
                    * @param accumulator The accumulator to add the value to
                    * @return
                    */
                   @Override
                   public Set<String> add(Event value, Set<String> accumulator) {
   
                       accumulator.add(value.getUser());
                       return accumulator;
                   }

                   /**
                    * 触发窗口计算时,调用一次
                    * @param accumulator The accumulator of the aggregation
                    * @return
                    */
                   @Override
                   public Integer getResult(Set<String> accumulator) {
   
                       return accumulator.size();
                   }

                   @Override
                   public Set<String> merge(Set<String> a, Set<String> b) {
   
                       return null;
                   }
               }
       ).print("aggregate");

        try {
   
            env.execute();
        } catch (Exception e) {
   
            throw new RuntimeException(e);
        }
    }
}

相关推荐

  1. [AIGC] 深入理解Flink中窗口水位线定时器

    2023-12-09 19:46:09       51 阅读
  2. C++ 中值传递引用传递区别?

    2023-12-09 19:46:09       23 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-09 19:46:09       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-09 19:46:09       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-09 19:46:09       82 阅读
  4. Python语言-面向对象

    2023-12-09 19:46:09       91 阅读

热门阅读

  1. ES6 箭头函数

    2023-12-09 19:46:09       58 阅读
  2. React拖拽实践

    2023-12-09 19:46:09       72 阅读
  3. 【LeetCode】2620. 计数器

    2023-12-09 19:46:09       59 阅读
  4. 面试-算法

    2023-12-09 19:46:09       54 阅读
  5. 大一C语言作业 12.8

    2023-12-09 19:46:09       52 阅读
  6. 如何配置git的ssh密钥

    2023-12-09 19:46:09       60 阅读
  7. 决策树 ID3 算法

    2023-12-09 19:46:09       48 阅读
  8. day4 移出倒数第n个节点

    2023-12-09 19:46:09       57 阅读