49、Flink 的数据源的 SplitReader API 详解

SplitReader API
a)概述

核心的 SourceReader API 是完全异步的,但实际上,大多数 Sources 都会使用阻塞的操作,例如客户端(如 KafkaConsumer)的 poll() 阻塞调用,或者分布式文件系统(HDFS, S3等)的阻塞I/O操作;为了使其与异步 Source API 兼容,这些阻塞(同步)操作需要在单独的线程中进行,并在之后将数据提交给 reader 的异步线程。

SplitReader 是基于同步读取/轮询的 Source 的高级(high-level)API,例如 file source 和 Kafka source 的实现等

核心是上面提到的 SourceReaderBase 类,其使用 SplitReader 并创建提取器(fetcher)线程来运行 SplitReader,该实现支持不同的线程处理模型。

b)SplitReader

SplitReader API 只有以下三个方法:

  • 阻塞式的提取 fetch() 方法,返回值为 RecordsWithSplitIds。
  • 非阻塞式处理分片变动 handleSplitsChanges() 方法。
  • 非阻塞式的唤醒 wakeUp() 方法,用于唤醒阻塞中的提取操作。

SplitReader 仅需要关注从外部系统读取记录,因此比 SourceReader 简单得多。

c)SourceReaderBase

常见的 SourceReader 实现方式如下:

  • 有一个线程池以阻塞的方式从外部系统提取分片。
  • 解决内部提取线程与其他方法调用(如 pollNext(ReaderOutput))之间的同步。
  • 维护每个分片的水印(watermark)以保证水印对齐。
  • 维护每个分片的状态以进行 Checkpoint。

为了减少开发新的 SourceReader 所需的工作,Flink 提供了 SourceReaderBase 类作为 SourceReader 的基本实现。 SourceReaderBase 已经实现了上述需求,要重新编写新的 SourceReader,只需要让 SourceReader 继承 SourceReaderBase,而后完善一些方法并实现 SplitReader。

d)SplitFetcherManager

SourceReaderBase 支持几个开箱即用的线程模型,取决于 SplitFetcherManager 的行为模式; SplitFetcherManager 创建和维护一个分片提取器(SplitFetchers)池,同时每个分片提取器使用一个 SplitReader 进行提取,它还决定如何分配分片给分片提取器。

如下所示,一个 SplitFetcherManager 可能有固定数量的线程,每个线程对分配给 SourceReader 的一些分片进行抓取。

在这里插入图片描述

以下代码片段实现了此线程模型。

/**
 * 一个SplitFetcherManager,它具有固定数量的分片提取器,
 * 并根据分片ID的哈希值将分片分配给分片提取器。
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // 创建 numFetchers 个分片提取器.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // 根据它们所属的提取器将分片聚集在一起。
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // 将分片分配给它们所属的提取器。
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}

使用这种线程模型的SourceReader可以像下面这样创建:

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Map<String, SplitStateT> finishedSplitIds) {
        // 在回调过程中对完成的分片进行处理。
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}

SourceReader 的实现还可以在 SplitFetcherManagerSourceReaderBase 的基础上编写自己的线程模型。

相关推荐

  1. 45Flink Process Function 详解

    2024-06-11 11:10:03       24 阅读
  2. 48Flink Data Source API 详解

    2024-06-11 11:10:03       32 阅读
  3. 25、Flink 支持数据类型及序列化详解

    2024-06-11 11:10:03       26 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-11 11:10:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-11 11:10:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-11 11:10:03       87 阅读
  4. Python语言-面向对象

    2024-06-11 11:10:03       96 阅读

热门阅读

  1. go 基础笔记

    2024-06-11 11:10:03       30 阅读
  2. OPAMC架构介绍

    2024-06-11 11:10:03       26 阅读
  3. NOR flash和NAND flash的区别

    2024-06-11 11:10:03       33 阅读
  4. 数据仓库技术及应用(Hive调优)

    2024-06-11 11:10:03       32 阅读
  5. 现代 C++的高效并发编程模式

    2024-06-11 11:10:03       32 阅读
  6. 2024.6.10刷题记录

    2024-06-11 11:10:03       37 阅读
  7. 三分的空间至关重要

    2024-06-11 11:10:03       29 阅读
  8. 【烟花game】

    2024-06-11 11:10:03       32 阅读
  9. 【DevOps】什么是 pfSense?免费构建SDWAN

    2024-06-11 11:10:03       32 阅读