45、Flink 的 Process Function 详解

Process Function
1.概述

ProcessFunction 是底层的数据流处理操作,可访问所有(非循环)流应用程序的基本模块。

  • 事件 (数据流中的元素)
  • 状态(容错、一致、仅在 keyed stream 上)
  • 定时器(事件时间和处理时间,仅在 keyed stream 上)

ProcessFunction 可以被认为是一个可以访问 keyed State 和定时器的 FlatMapFunction,它通过对输入流中接收到的每个元素进行调用来处理事件。

对于状态,ProcessFunction 允许访问 keyed State,可通过 RuntimeContext 访问。

定时器支持处理时间和事件时间,对函数 processElement(…)的每次调用都会获得一个 Context 对象,该对象可以访问元素的事件时间戳和 TimerService;TimerService 可用于注册处理时间和事件时间定时器;对于事件时间定时器,当 watermark 到达或超过定时器的时间戳时,会调用 onTimer(…),在该调用过程中,所有状态的作用域再次限定为创建定时器时使用的 key,从而允许定时器操作 keyed State。

注意:如果要访问 keyed State 和定时器,必须在 keyed stream 上应用 ProcessFunction。

stream.keyBy(...).process(new MyProcessFunction());
2.底层 Joins

对于两个输入的底层 Join 可以使用 CoProcessFunction 或 KeyedCoProcessFunction,通过调用 processElement1(…)和processElement2(…)分别处理两个输入。

底层 Join 流程如下

为一个输入或两个输入创建状态对象;

从输入中接收元素时更新状态;

从另一个输入接收元素后,查询状态并产生 Join 结果;

示例:将客户数据加入金融交易,同时保留客户数据的状态;如果关心在面对无序事件时具有完整和确定性的 Join,那么当客户数据流的watermark 超过交易时间时,可以使用定时器评估并触发交易的 Join。

3.案例

示例:KeyedProcessFunction 维护每个 key 的 count,并在一分钟后(事件时间)发出一个 key/count,而不更新该 key。

  • count、key 和上次修改时间戳存储在 ValueState 中;
  • 对于每条记录,KeyedProcessFunction 递增计数器并设置最后一次修改的时间戳;
  • 一分钟后(事件时间)触发定时器;
  • 定时器触发时,会根据存储计数的最后修改时间检查回调的事件时间戳,并在它们匹配的情况下发出 key/计数。

注意:也可以用会话窗口实现,此处使用 KeyedProcessFunction。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(value -> value.f0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction 
        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(OpenContext openContext) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
4.KeyedProcessFunction

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer() 方法中提供了对定时器 key 的访问。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    K key = ctx.getCurrentKey();
    // ...
}
5.定时器
a)概述

处理时间和事件时间定时器都由 TimerService 内部维护,并排队等待执行。

TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次。

Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态。

b)容错

定时器是容错的,并与应用程序的状态一起进行 Checkpoint,如果发生故障恢复或从保存点启动应用程序,则会恢复定时器。

当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动。

定时器使用异步检查点,除了 RocksDB 后端/与增量快照/与基于堆的定时器的组合,但是大量的定时器会增加检查点时间,因为定时器是检查点状态的一部分。

c)合并定时器

由于 Flink 为每个 key 和时间戳只维护一个定时器,可以通过降低定时器的精度来合并定时器以减少定时器的数量

对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器。

long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

由于事件时间定时器只在 watermark 到达时触发,可以使用当前 watermark 来注册定时器,并将其与下一个 watermark 合并。

long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);

定时器也可以按如下方式停止和移除

停止处理时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

停止事件时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

如果没有注册具有给定时间戳的定时器,则停止定时器无效。

相关推荐

  1. FlinkProcessFunction用法

    2024-06-05 20:36:01       36 阅读
  2. 45Flink Process Function 详解

    2024-06-05 20:36:01       9 阅读
  3. 48Flink Data Source API 详解

    2024-06-05 20:36:01       9 阅读
  4. flink分别使用FilterMap和ProcessFunction实现去重逻辑

    2024-06-05 20:36:01       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-05 20:36:01       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-05 20:36:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-05 20:36:01       20 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-05 20:36:01       20 阅读

热门阅读

  1. SpringBoot历史版本信息

    2024-06-05 20:36:01       9 阅读
  2. 【实用技巧】Unity的Text组件实用技巧

    2024-06-05 20:36:01       8 阅读
  3. GPT-4o:人工智能新纪元的启航者

    2024-06-05 20:36:01       8 阅读
  4. 如何评价GPT-4o?(要点精简)

    2024-06-05 20:36:01       8 阅读
  5. 排序---快速排序

    2024-06-05 20:36:01       7 阅读
  6. Python没什么?深度解析Python的无限可能与挑战

    2024-06-05 20:36:01       9 阅读
  7. React.forwardRef 使用

    2024-06-05 20:36:01       9 阅读
  8. h5相机功能

    2024-06-05 20:36:01       8 阅读
  9. 机器人编程课有什么东西:探索编程的奇妙世界

    2024-06-05 20:36:01       10 阅读
  10. 如何使用 Apache 和 Nginx 创建临时和永久重定向

    2024-06-05 20:36:01       9 阅读