在Flink SQL中使用watermark进阶功能

摘自官网

在Flink SQL中使用watermark进阶功能

在Flink1.18中对Watermark的很多进阶功能(比如watermark对齐)通过datastream api很容易使用。在sql中使用不是很方便,在Flink1.18中对这些功能进行扩展。在sql中也能使用这些功能。

只有实现了SupportsWatermarkPushDown接口的源连接器(source connector)(比如kafka、pulsar)才可以使用这些进阶功能。这些进阶的功能都可以使用dynamic table options或OPTIONS hint进行配置,如果用户同时使用dynamic table options或OPTIONS hint进行配置,那么OPTIONS hint配置的值会优先生效。如果用户在sql的多个地方使用了OPTIONS hint,那么SQL中出现的第一个hint会优先生效

功能I配置Watermark发射方式 

Flink中watermark有两种发射方式:

  • on-periodic: 周期性发射
  • on-event: 每条事件数据发射一次watermark

在Datastream中用户可以通过WatermarkGenerator接口来自己决定使用哪种watermark。在sql中watermark的使用默认是周期性发射的方式,默认周期是200ms。这个周期可以通过pipeline.auto-watermark-interval的方式修改。如果需要每条数据都发射一次watermark。可以在source表中设置。

CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.emit.strategy'='on-event', --这里设置
  ...
);


select ... from source_table /*+ OPTIONS('scan.watermark.emit.strategy'='on-periodic') */

功能II. 配置数据源的空闲超时时间 

使用场景:如果数据源的某个分片在一段时间未发送事件数据。那么WatermarkGenerator就不会获取任何数据去生成watermark。在这种情况下,如果其他某些分区仍然在发送事件数据就会出现问题。因为下游算子watermark的计算方式是取所有上游并行数据源watermark的最小值,由于空闲的分片/分区没有计算新的watermark,任务的watermark将不会发生变化

可以设置数据源的空闲超时时间,一个分区/分片在超时时间没有发送事件数据就会被标记为空闲,下游计算新的watermark的时候将会忽略这个空闲sourse,从而让watermark继续推进。

全局的超时时间:table.exec.source.idle-timeout

单个数据源的超时时间:

-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.idle-timeout'='1min',
  ...
);

select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') */

如果全局的超时时间和单个数据源的超时时间都设置了,那么会优先启用单个数据源的超时时间

功能III. Watermark对齐

使用场景:同一个数据源的不同分区/分片之间可能出现消费速度不一样的情况,不同数据源之间的消费速度也可能不一样。假如下游有一些有状态的算子,这些算子可能需要在状态中缓存更多那些消费更快的数据,等待那些消费慢的数据,状态可能会变得很大;也会导致严重的乱序。影响窗口计算。使用Watermark对齐确保源表的某个分片/分块/分区的watermark不会比其他分片/分块/分区增加太快。

缺点:影响性能

CREATE TABLE user_actions (
...
user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s',
...
);

scan.watermark.alignment.group配置对齐组名称,在同一个组的数据源将会对齐
scan.watermark.alignment.max-drift配置分片/分块/分区允许偏离对齐时间的最大范围
scan.watermark.alignment.update-interval配置计算对齐时间的频率,非必需,默认是1s


select ... from source_table /*+ OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 'scan.watermark.alignment.max-drift'='1min', 'scan.watermark.alignment.update-interval'='1s') */

 如果源连接器(source connector)未实现FLIP-217,并且使用了watermark对齐的功能,那么任务运行会抛出异常,用户可以设置pipeline.watermark-alignment.allow-unaligned-source-splitstrue来禁用源分片的WaterMark对齐功能,此时,只有当分片数量等于源并行度的时候,watermark对齐功能才能正常工作。

相关推荐

  1. Flink SQL使用watermark功能

    2024-03-25 05:18:01       44 阅读
  2. SQLMap使用

    2024-03-25 05:18:01       46 阅读
  3. ES使用

    2024-03-25 05:18:01       46 阅读
  4. [Python] Python使用正则表达式

    2024-03-25 05:18:01       57 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-25 05:18:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-25 05:18:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-25 05:18:01       82 阅读
  4. Python语言-面向对象

    2024-03-25 05:18:01       91 阅读

热门阅读

  1. 使用docker搭建dockge

    2024-03-25 05:18:01       40 阅读
  2. 自学python指导教程

    2024-03-25 05:18:01       35 阅读
  3. Nodejs版本管理工具nvm

    2024-03-25 05:18:01       42 阅读
  4. Chinese-LLaMA-Alpaca-2模型量化部署&测试

    2024-03-25 05:18:01       34 阅读
  5. 【Python】复习12:标准库与第三方库

    2024-03-25 05:18:01       40 阅读
  6. Postgresql中常见的执行计划解释

    2024-03-25 05:18:01       37 阅读
  7. vue3模板引用介绍

    2024-03-25 05:18:01       50 阅读
  8. 数据结构面试常见问题

    2024-03-25 05:18:01       44 阅读
  9. Day 30回溯06

    2024-03-25 05:18:01       46 阅读
  10. flink的MaxOutOfOrderness 和 Allowedlateness 区别

    2024-03-25 05:18:01       46 阅读
  11. 【Swift】如何让实例对象像函数一样使用

    2024-03-25 05:18:01       43 阅读