Merge-On-Read

基本介绍

Iceberg的Merge-On-Read

Merge-On-Read,顾名思义,就是在读取的时候进行合并,是与Copy-On-Write相反的一种模式

在Iceberg中,Merge-On-Read同样用于行级更新,整体过程如下

当更新数据时,Iceberg不再Copy一份旧数据,而是直接将更新数据写入独立的一个文件用来标识需要删除的数据

这种模式减少了写入时的合并操作,但是加重了读取数据时的合并操作,因此适合写多读少的场景

Merge-On-Read类型

Iceberg的Merge-On-Read有两种类型,对应使用两种不同的方法来定位需要删除的数据,分别是:EqualityDelete和PositionDelete

EqualityDelete:等值删除。这种模式下,delete文件记录的内容跟数据文件一样,是行级的数据。进行读取合并时,使用指定的列做等值判断,进行数据的删除合并

PositionDelete:位置删除。这种模式下,delete文件记录的内容是需要删除的数据位置,即数据文件地址和数据行号。进行读取合并时,基于数据地址进行删除合并

目前Iceberg没有进行EqualityDelete和PositionDelete选择的配置,使用哪种模式就看API实现了哪个接口。比如配置Spark使用Merge-On-Read时,使用的就是PositionDelete

case MERGE_ON_READ:

    return new SparkPositionDeltaOperation(spark, table, branch, info, isolationLevel);

测试表现

执行如下测试

CREATE TABLE icedb.morTable (

id bigint COMMENT 'unique id',

data string)

USING iceberg TBLPROPERTIES ('format-version'='2', 'write.update.mode' = 'merge-on-read');

insert into icedb.morTable values (1, 'name1'),(99, 'name99');

insert into icedb.morTable values (2, 'name1'),(88, 'name88');

update icedb.morTable set data = 'update' WHERE id =1;
  1. snapshot:与Copy-On-Write表现一致
  2. manifest-list:这里存在差异,产生了一条content=1的数据,也就是被标记为deletes,其他三条数据都是content=0
  3. manifest:这里总共四个manifest(2条insert分别产生一个,一个update产生两个),这里四个manifest的status都为1(0=existing、1=added、2=deleted);注意的是data_file中的content字段,指向deletes文件的是1,其他为0(0=data, 1=position deletes, 2=equality deletes)
  4. datafile:主要关注update产生的两个文件,一个是正常的数据文件,存的是update后的数据(1, 'update' );另一个是position delete文件,存的是需要删除的文件地址和行号{"file_path": "hdfs://nameservice/spark/icedb.db/morTable/data/00000-0-52cbb7ca-6a2b-4f87-a2ba-fefa2470d5b8-00001.parquet", "pos": 0}

Spark Update更新流程

整体流程与Copy-On-Write是一样的,两者都是Iceberg用来做行级更新的,只是具体的实现过程不一样

扫描过程

第一步同样是做旧数据的扫描,根据update传入的where条件扫描数据

不同的是,构建BatchScan扫描器的时候,Merge-On-Read比Copy-On-Write少了一项设置:ignoreResiduals()

ignoreResiduals()的作用在Copy-On-Write中已经介绍过,是设置扫描过滤条件只应用到文件。也就是说,设置了此项的Copy-On-Write扫描的时候返回的是文件粒度的数据,会把不需要更新的数据也读出来重写一遍;而没有设置此项的Merge-On-Read只读取完全匹配的数据,并不会读取不需要更新的数据行来进行重写

这个的具体应用是在文件扫描任务BaseFileScanTask当中,构建的时候会设置这个参数

BaseFileScanTask baseFileScanTask =

    new BaseFileScanTask(

         dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());

RewriteUpdateTable

Merge-On-Read走buildWriteDeltaPlan分支,生成的计划树如下

WriteIcebergDelta RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]

   +- Filter (id#0L = cast(1 as bigint))

      +- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

RowLevelCommandScanRelationPushDown

这步走第一个分支,rewritePlan为WriteIcebergDelta的分支,最终的计划树为

WriteIcebergDelta RelationV2[id#105L, data#106] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- UpdateRows[__row_operation#113, id#114L, data#115, _file#116, _pos#117L, _spec_id#118, _partition#119]

   +- Project [id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108]

      +- Filter (isnotnull(id#105L) AND (id#105L = 1))

         +- RelationV2[id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108] spark_catalog.icedb.morTable

ExtendedV2Writes

这一步走的是WriteIcebergDelta分支,最终的计划树为

UpdateIcebergTable [assignment(id#0L, id#0L), assignment(data#1, update2)], (id#0L = 1)

:- RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- WriteIcebergDelta

   +- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]

      +- Project [id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3]

         +- Filter (isnotnull(id#0L) AND (id#0L = 1))

            +- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTab

WriteDeltaExec

最后同样进入ExtendedDataSourceV2Strategy,此处走WriteIcebergDelta分支,生成WriteDeltaExec(3.4这个类Spark集成了,所以不在Iceberg里了

WriteDeltaExec与ReplaceDataExec一样是V2ExistingTableWriteExec的子类,不同的是WriteDeltaExec里对writingTask进行了特殊的实现,分别是DeltaWritingSparkTask和DeltaWithMetadataWritingSparkTask

两个writingTask的实现类基本相同,核心逻辑是根据不同的操作类型(insert、update、delete)调用DeltaWriter的对应接口

这里的DeltaWriter就是前面SparkPositionDeltaOperation创建的SparkPositionDeltaWrite,目前测试看update语句走的是insert+delete的流程

insert就是正常的流程,SparkPositionDeltaWrite的delete接口接收数据,这个数据是包含两个字段filePath和position,最终把这个字段写入delete类型的文件当中,写delete文件的时候在result的字段会记录delete列表,最终commit提交的时候就有对应关系

看前面的计划树,返回的数据schema当中包含了(_file#4, _pos#5L),这个就是delete这边数据的来源

定位信息

前面说过,最终写入delete文件的是file、pos两个字段,这两个字段在构建计划树的时候已经放在了schema上,来源是RewriteUpdateTable.buildWriteDeltaPlan,也就是RewriteUpdateTable的SupportsDelta分支

分支会重写计划,重写计划的时候有一步resolveRowIdAttrs,这一步会提取相应字段,让最终的数据源返回上加上file和position信息

读取数据

首先在创建ManifestGroup的时候,会去构建deleteIndexBuilder,deleteIndexBuilder用于查询标记为delete的文件,其输入是deleteManifests。

Iceberg元数据可以单独获取对应的类型的Manifest,具体操作在build的时候,build当中会单独对EqualityDelete和PositionDelete做操作,操作相同,只是找的文件类型不同

最后在createFileScanTasks构建扫描任务的时候,会把DeleteFile信息放入扫描任务

最终生效在读取文件的时候。以Spark为例,在BatchDataReader当中,基于上诉生成的扫描任务,首先获取要读取的文件,然后获取对应的delete文件封装SparkDeleteFilter,最终传入reader函数

真正生效的地方在ColumnarBatchReader当中,调用读取的时候会略过传入的delete对应的点,这样就只读取有效函数,忽略delete数据

具体的逻辑是在spark里,首先传入delete文件当中的position单独读取要删除的数据;然后在spark的逻辑里,从数据集当中把读到的这个数据给删除掉,在BufferedRowIterator的next当中,有删除逻辑

public InternalRow next() {

    return currentRows.remove();

  }

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-23 01:32:05       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-23 01:32:05       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-23 01:32:05       45 阅读
  4. Python语言-面向对象

    2024-07-23 01:32:05       55 阅读

热门阅读

  1. 云计算安全技术介绍

    2024-07-23 01:32:05       16 阅读
  2. 杭电第一场

    2024-07-23 01:32:05       15 阅读
  3. Cow coupons

    2024-07-23 01:32:05       14 阅读
  4. ros2--服务接口

    2024-07-23 01:32:05       13 阅读
  5. C/C++内存管理笔记

    2024-07-23 01:32:05       13 阅读
  6. GraphRAG的实践

    2024-07-23 01:32:05       10 阅读
  7. 简单三步实现跨境多种支付

    2024-07-23 01:32:05       11 阅读
  8. 二维数组与指针

    2024-07-23 01:32:05       14 阅读
  9. Nougat - 学术文档PDF解析(LaTeX数学、表格)

    2024-07-23 01:32:05       16 阅读
  10. linux发送邮件实测

    2024-07-23 01:32:05       16 阅读