Kafka 实现之分配(Distribution)

目录

一. 前言

二. 消费者 offset 跟踪(Consumer Offset Tracking)

三. ZooKeeper 目录(ZooKeeper Directories)

3.1. 符号(Notation)

3.2. Broker Node Registry

3.3. Broker Topic Registry

3.4. 集群 ID(Cluster Id)

3.5. Broker node registration


一. 前言

    Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。除此之外,Kafka 还提供了另外两种分配策略:RoundRobinAssignor 和 StickyAssignor。

二. 消费者 offset 跟踪(Consumer Offset Tracking)

原文引用:Kafka consumer tracks the maximum offset it has consumed in each partition and has the capability to commit offsets so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the group coordinator. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that group coordinator (broker). Consumer groups are assigned to coordinators based on their group names. A consumer can look up its coordinator by issuing a FindCoordinatorRequest to any Kafka broker and reading the FindCoordinatorResponse which will contain the coordinator details. The consumer can then proceed to commit or fetch offsets from the coordinator broker. In case the coordinator moves, the consumer will need to rediscover the coordinator. Offset commits can be done automatically or manually by consumer instance.

    Kafka 使用者跟踪它在每个分区中消费的最大偏移量,并能够提交偏移量,以便在重新启动时可以从这些偏移量恢复。Kafka 提供了将指定消费者组的所有 offset 存储在指定的 Broker(针对该组)中的选项,该 Broker 称为组协调器。即,该消费者组中的任何消费者实例都应将其偏移量提交和获取发送到该组协调器(Broker)。消费者组根据其组名分配给协调员。消费者可以通过向任何 Kafka Broker 发出 FindCoordinatorRequest 并读取包含协调器详细信息的FindCoordinationerResponse 来查找其协调器。然后,消费者可以继续从协调器 Broker 提交或获取偏移量。如果协调器移动,消费者将需要重新发现该协调器。偏移提交可以由消费者实例自动或手动完成。

原文引用:When the group coordinator receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The broker sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The coordinator also caches the offsets in an in-memory table in order to serve offset fetches quickly.

    当组协调器收到一个 OffsetCommitRequest 时,它会将该请求追加到一个名为__consumer_offset 的特殊压缩 Kafka Topic 中。只有在偏移 Topic 的所有副本都接收到偏移之后,Broker 才会向消费者发送成功的 offset 提交响应。如果偏移量未能在可配置的时间内复制,则偏移量提交将失败,消费者可能会在退出后重试提交。Broker 会定期压缩偏移量 Topic,因为它只需要维护每个分区的最新偏移量提交。协调器还将 offset 缓存在内存表中,以便快速获取 offset。

原文引用:When the coordinator receives an offset fetch request, it simply returns the last committed offset vector from the offsets cache. In case coordinator was just started or if it just became the coordinator for a new set of consumer groups (by becoming a leader for a partition of the offsets topic), it may need to load the offsets topic partition into the cache. In this case, the offset fetch will fail with an CoordinatorLoadInProgressException and the consumer may retry the OffsetFetchRequest after backing off. 

    当协调器接收到偏移量获取请求时,它只是从偏移量缓存返回最后提交的偏移量向量。如果协调器刚刚启动,或者它刚刚成为一组新的消费者组的协调器(通过成为偏移 Topic 分区的领导者),它可能需要将偏移 Topic 分区加载到缓存中。在这种情况下,偏移量获取将失败,并出现CoordinatorLoadInProgressException,消费者可能会在退出后重试 OffsetFetchRequest。

三. ZooKeeper 目录(ZooKeeper Directories)

原文引用:The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.

    以下给出了用于协调消费者和 Broker 之间的 ZooKeeper 结构和算法。

3.1. 符号(Notation)

原文引用:When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".

    当一个 path 中的元素表示为[xyz]时,这意味着 xyz 的值不是固定的,事实上,对于 xyz 的每个值可能都有一个 ZooKeeper 的 znode。例如 /topics/[topic] 是一个名为 /topics 的目录,其中包含每个 Topic 名称的子目录。还给出了数字范围,如[0…5],表示子目录0、1、2、3、4。箭头 -> 用于表示 znode 的内容。例如 /hello->world 表示一个包含值“world”的 znode。

3.2. Broker Node Registry

/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

原文引用:This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) results in an error. 

    这是所有当前 Broker 节点列表,每个节点都提供一个唯一的逻辑 Broker ID,用于向消费者标识它(必须作为其配置的一部分提供)。在启动时,Broker 节点通过在 /brokers/ids 下创建一个具有逻辑 Broker ID 的 znode 来注册自己。逻辑 Broker ID 的目的是允许在不影响消费者的情况下将 Broker 移动到不同的物理机器。尝试注册一个已存在的 Broker ID 时将会返回错误(因为两个服务器配置了相同的 Broker ID)。

原文引用:Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). 

    由于 Broker 在ZooKeeper中用的是临时 znode,因此注册是动态的,如果 Broker 关闭或宕机(从而通知消费者它不再可用),则此节点将消失。

3.3. Broker Topic Registry

/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

原文引用:Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.

    每个 Broker 都在其维护的 Topic 下注册自己,并存储该 Topic 的分区数。

3.4. 集群 ID(Cluster Id)

原文引用:The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_\-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.

    集群ID 是分配给 Kafka 集群的唯一且不可变的标识符。集群 ID 最多可以有22个字符,允许的字符由正则表达式 [a-zA-Z0-9_\-]+ 定义,该表达式对应于 URL 安全 Base64 变体使用的字符,不带填充。从概念上讲,它是在集群第一次启动时自动生成的。

原文引用:Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.

    在实现方面,它是在 0.10.1 或更高版本的 Broker 首次成功启动时生成的。在启动期间,Broker 尝试从 /cluster/id-znode 获取集群 ID。如果 znode 不存在,代理会生成一个新的集群 ID,并使用该集群 ID创建 znode。

3.5. Broker node registration

原文引用:The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.

    Broker 节点基本上都是独立的,所以它们只发布关于它们所拥有的信息。当有 Broker 加入时,它会在 Broker 节点注册表目录下注册自己,并写入有关其主机名和端口的信息。Broker 还会在Broker Topic 注册表中注册现有 Topic 及其逻辑分区的列表。新 Topic 在 Broker 上创建时会动态注册。

相关推荐

  1. Kafka 实现分配Distribution

    2024-03-18 15:18:02       34 阅读
  2. kafka入门(九):kafka分区分配策略

    2024-03-18 15:18:02       50 阅读
  3. kafka入门(九):kafka分区分配策略

    2024-03-18 15:18:02       48 阅读
  4. Kafka Avro序列化三:使用Schema Register实现

    2024-03-18 15:18:02       65 阅读
  5. springboot2.2.9整合kafkaKafkaListener实现原理

    2024-03-18 15:18:02       42 阅读
  6. Kafka 监控分层存储监控和 KRaft 监控指标

    2024-03-18 15:18:02       27 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-18 15:18:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-18 15:18:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-18 15:18:02       87 阅读
  4. Python语言-面向对象

    2024-03-18 15:18:02       96 阅读

热门阅读

  1. react组件懒加载

    2024-03-18 15:18:02       40 阅读
  2. freemarker if 判断list 中的元素是否为null

    2024-03-18 15:18:02       45 阅读
  3. MySQL模块---安装并配置

    2024-03-18 15:18:02       38 阅读
  4. 牛津大学“领域驱动设计”课程

    2024-03-18 15:18:02       45 阅读
  5. 【 React 】React 组件之间如何通信?

    2024-03-18 15:18:02       46 阅读
  6. JVM学习-常量池、运行时常量池以及串池

    2024-03-18 15:18:02       44 阅读
  7. HTML世界之input标签

    2024-03-18 15:18:02       47 阅读
  8. UbuntuServer22.04配置静态IP地址

    2024-03-18 15:18:02       40 阅读
  9. PyTorch深度学习框架:综合入门与应用指南

    2024-03-18 15:18:02       44 阅读
  10. Haproxy

    Haproxy

    2024-03-18 15:18:02      38 阅读
  11. 安卓利用CameraX实现拍照录像

    2024-03-18 15:18:02       44 阅读
  12. sparksql DSL编程风格

    2024-03-18 15:18:02       40 阅读
  13. 使用 React Router 的 withRouter

    2024-03-18 15:18:02       34 阅读