Iceberg从入门到精通系列之二十四:Spark Structured Streaming

Iceberg从入门到精通系列之二十四:Spark Structured Streaming

Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark DSv2 是一个不断发展的 API,在 Spark 版本中提供不同级别的支持。

一、Streaming Reads

Iceberg 支持处理从历史时间戳开始的 Spark 结构化流作业中的增量数据:

val df = spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
    .load("database.table_name")

Iceberg 仅支持从追加快照中读取数据。覆盖快照无法处理,默认会引发异常。通过设置streaming-skip-overwrite-snapshots=true 可以忽略覆盖。同样,删除快照默认会引发异常,通过设置streaming-skip-delete-snapshots=true可以忽略删除。

二、Streaming Writes

要将流式查询中的值写入 Iceberg 表,请使用 DataStreamWriter:

data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("checkpointLocation", checkpointPath)
    .toTable("database.table_name")

如果您使用的是 Spark 3.0 或更早版本,则需要使用 .option(“path”, “database.table_name”).start(),而不是 .toTable(“database.table_name”)。

对于基于目录的 Hadoop 目录:

data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", "hdfs://nn:8020/path/to/table") 
    .option("checkpointLocation", checkpointPath)
    .start()

Iceberg 支持追加和完整输出模式:

  • append:将每个微批次的行追加到表中
  • complete:替换每个微批次的表内容

在开始流式查询之前,请确保您创建了表。请参阅 SQL 创建表文档以了解如何创建 Iceberg 表。

Iceberg 不支持实验性连续处理,因为它不提供“提交”输出的接口。

三、Partitioned table

Iceberg 需要在写入数据之前按每个任务的分区对数据进行排序。在 Spark 中,任务按 Spark 分区进行分割。针对分区表。对于批量查询,建议您进行显式排序来满足要求(请参阅此处),但该方法会带来额外的延迟,因为重新分区和排序被视为流工作负载的繁重操作。为了避免额外的延迟,您可以启用扇出编写器来消除这一要求。

data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("fanout-enabled", "true")
    .option("checkpointLocation", checkpointPath)
    .toTable("database.table_name")

扇出写入器打开每个分区值的文件,并且在写入任务完成之前不会关闭这些文件。避免使用扇出写入器进行批量写入,因为对输出行进行显式排序对于批量工作负载来说成本较低。

四、流表的维护

流式写入可以快速创建新的表版本,创建大量表元数据来跟踪这些版本。强烈建议通过调整提交率、使旧快照过期以及自动清理元数据文件来维护元数据。

调整提交率

高提交率会产生数据文件、清单和快照,从而导致额外的维护。建议触发间隔至少为 1 分钟,并根据需要增加间隔。

使旧快照过期

写入表的每个批次都会生成一个新快照。 Iceberg 跟踪表元数据中的快照,直到它们过期。快照会随着频繁提交而快速积累,因此强烈建议定期维护流式查询写入的表。快照过期是删除元数据和任何不再需要的数据文件的过程。默认情况下,该过程将使超过五天的快照过期。

压缩数据文件
从流处理写入的数据量通常很小,这可能会导致表元数据跟踪大量小文件。将小文件压缩为大文件可以减少表所需的元数据,并提高查询效率。 Iceberg 和 Spark 附带了 rewrite_data_files 过程。

重写清单
为了优化流工作负载的写入延迟,Iceberg 可以使用不会自动压缩清单的“快速”附加写入新快照。这可能会导致大量小的清单文件。 Iceberg可以重写清单文件的数量来提高查询性能。 Iceberg 和 Spark 附带了 rewrite_manifests 过程。

最近更新

  1. TCP协议是安全的吗?

    2024-02-03 11:28:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-03 11:28:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-03 11:28:02       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-03 11:28:02       20 阅读

热门阅读

  1. Redis常用数据类型--List

    2024-02-03 11:28:02       31 阅读
  2. 大端和小端模式介绍

    2024-02-03 11:28:02       34 阅读
  3. Ubuntu-docker下的MySQL部署(持续更新)

    2024-02-03 11:28:02       30 阅读
  4. LeetCode 24天

    2024-02-03 11:28:02       31 阅读
  5. 【计算机安全】相关整理

    2024-02-03 11:28:02       29 阅读
  6. URL重写

    URL重写

    2024-02-03 11:28:02      34 阅读
  7. L1-054 福到了

    2024-02-03 11:28:02       30 阅读
  8. 完整的带日志的FFmpeg的命令

    2024-02-03 11:28:02       34 阅读