水位线的传递
水位线传递原则:
1. 多个上游,同时给1个下游传递水位线,下游取哪个水位线:取最小的
2. 1个上游,同时给多个下游传递水位线,如何传:采用广播的方式
窗口
一种将无限数据切割成有限的数据块进行处理的方式。根据如何截取数据,分为时间窗口和计数窗口。如果数据做了keyby之后,对应key的数据间的窗口是隔离的。
按照窗口分配数据的规则分类:
- 滚动窗口:窗口是首尾相连的,窗口之间没有重叠,也不会有间隔
- 滑动窗口:窗口会出现重叠,数据有可能被同时分配到多个窗口中,滑动窗口的计算频率是由滑动步长决定的。
- 会话窗口:通过数据之间的gap来划分窗口,如果两条数据之间的间隔没有超过设定的gap距离,则划分到一个窗口中。否则重新开一个窗口。
- 全局窗口:所有数据都往一个窗口中放,窗口永远不会关闭,一般不会去用。
API的调用
- 窗口分配器
- 非keyby分区:所有数据不会进行Key的隔离,都往同一个窗口中放
- keyby分区:数据会按照key进行隔离,会为每个key分配对应的窗口
- 窗口函数计算
基本增量聚合函数:sum,min, max, maxby, minby
增量聚合函数:通过窗口分配器分配好窗口后,窗口中每来一条数据都要执行一次聚合处理, 等到触发窗口计算时,将聚合的结果输出
- reduce:两两聚合:输入输出数据类型一致。窗口中的第一个数据不执行聚合
- aggregate:两两聚合:第一个数据不执行聚合,输入类型和输出类型不一致
全窗口函数(全量聚合):
1. 将数据收集齐后,统一计算输出结果
2. 获取窗口信息,窗口开始时间,结束时间
3. 泛型:输入类型,输出类型,key的类型,窗口的类型
4. process(key,上下文,数据集合,采集器)
public class Flink08_AggregateFunction {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
SingleOutputStreamOperator<Event> ds = Flink06_EventSource.getEventSource(env)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
(event, ts) -> event.getTs()
)
);
ds.print("input");
//统计每10秒内,每个用户的UV,独立访问数,即10秒内访客的个数
ds.windowAll(
TumblingEventTimeWindows.of(Time.seconds(10))
).aggregate(
new AggregateFunction<Event, Set<String>, Integer>() {
/**
* 创建累加器对象,该方法只会被调用一次
* @return
*/
@Override
public Set<String> createAccumulator() {
return new HashSet<String>();
}
/**
* 累加过程,每来一条数据就调用一次
* @param value The value to add
* @param accumulator The accumulator to add the value to
* @return
*/
@Override
public Set<String> add(Event value, Set<String> accumulator) {
accumulator.add(value.getUser());
return accumulator;
}
/**
* 触发窗口计算时,调用一次
* @param accumulator The accumulator of the aggregation
* @return
*/
@Override
public Integer getResult(Set<String> accumulator) {
return accumulator.size();
}
@Override
public Set<String> merge(Set<String> a, Set<String> b) {
return null;
}
}
).print("aggregate");
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}