Kafka broker

1. zk中存储的kafka信息

/kafka/brokers/ids存储了在线的broker id。

/kafka/brokers/topics/xxx/partitions/n/state存储了Leader是谁以及isr队列

 /kafka/controller辅助Leader选举,每个broker都有一个controller,谁先在zk中注册上,谁就辅助Leader选举。

2. broker总体工作流程

1)每台broker启动后在zk中注册,即/kafka/borkers/ids

2)每台broker去抢占式注册controller,用于后面Leader选举

3)由注册的controller监听/kafka/borkers/ids节点变化

4)开始Leader选举,选举标准是以isr中存活为前提,以AR中排在前面的优先(AR是所有副本的集合,启动时会有一个固定的AR顺序,比如ar[1, 0, 2])

5)controller将选举出来的信息(Leader和isr信息)传到zk中,即/kafka/brokers/topics/xxx/partitions/n/state

6)其他broker的controller会从zk中同步相关信息

Kafka生产者发送数据到broker,数据在底层以Log方式(逻辑概念)存储,实际上是Segment(物理概念),一般1个Segment是1G,包含.log文件和.index文件,.index文件是索引,用于快速查询数据

7)如果Leader挂了,controller监听到节点变化,选举新的Leader,选举标准依然是以isr中存活为前提,以AR中排在前面的优先,最后更新Leader和isr队列信息

3.  新节点服役

新节点服役后,以前的topic所在的分区不会出现在新节点,即新节点不会分摊旧节点的存储压力。如果需要新节点参与进来,就需要进行一种类似于负载均衡的配置。先创建一个topic-to-move.json配置文件:

{
    "topics": [
        {"topic": "first"}
    ],
    "version": 1
}

生成一个负载均衡的计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

 上面一行是当前的分区分配,下面一行是建议的分区分配计划,创建副本存储计划increase-replication-factor.json,里面内容是上面得分建议计划。最后执行存储计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

 

还可以验证计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

查询这个topic的分区详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe

4.  退役旧节点

退役旧节点与服役新节点有一些类似,先创建一个topic-to-move.json配置文件,与服役新节点时一样,然后生成一个计划,只不过--broker-list 改为"0,1,2",接着执行计划,验证计划,都与服役新节点一样。

 最后在退役节点关闭kafka服务

bin/kafka-server-stop.sh

5.  Leader选举验证

创建四个分区四个副本的topic并查看:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu2 --partitions 4 --replications-factor 4

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic atguigu2

 

把3号broker停掉,那么isr队列中没有3,并且4号分区的Leader变为2

再把2号干掉

 

再恢复3号,发现Leader未变,仅isr队列信息中新增了3号

 

再恢复2号

再干掉1号

 

这样就验证了第二节讲的选举标准: 以isr中存活为前提,以AR中排在前面的优先

6. Leader和Follower故障处理细节

LEO:Log End Offset,每个副本的最后一个offset+1

HW:high watermark,高水位线,所有副本中最小的LEO,消费者能够看到的最大的offset就是HW - 1

1)如果Follower挂了,该Follower会立即被踢出isr,isr中其他Leader和Follower正常接受/同步数据,待该Follower恢复后,会读取上次的HW,将自己高于HW的数据丢弃,从HW开始与Leader同步,等到该Follower的LEO大于等于该Partition的HW,则重新加入isr队列。

2)如果Leader挂了, Leader会立即被踢出isr,并且会选出一个新的Leader,其余的Follower会将高于HW的数据丢弃,然后与新的Leader进行同步。此时只能保证数据的一致性,不能保证数据不丢失。

7. 手动调整分区副本

如果服务器的存储能力不同,希望将数据更多的存储在空间大的服务器上,那么就不应该按照Kafka分区副本的默认均匀分配,而是需要手动调整。创建4个分区,两个副本,都存在0号和1号broker上面。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic three --partitions 4 --replications-factor 2

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic three

 

 创建increase-replication-factor.json:

{
    "partitions": [
        {"topic": "three", "partitions": 0, "replicas": [0, 1]},
        {"topic": "three", "partitions": 1, "replicas": [0, 1]},
        {"topic": "three", "partitions": 2, "replicas": [1, 0]},
        {"topic": "three", "partitions": 3, "replicas": [1, 0]}
    ],
    "version": 1
}

执行存储计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

 最后查看

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic three

以上是减少副本,增加副本也是类似,先创建一个3个分区,1个副本的topic:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic four --partitions 3 --replications-factor 1

创建increase-replication-factor.json:

{
    "partitions": [
        {"topic": "four", "partitions": 0, "replicas": [0, 1, 2]},
        {"topic": "four", "partitions": 1, "replicas": [0, 1, 2]},
        {"topic": "four", "partitions": 2, "replicas": [0, 1, 2]}
    ],
    "version": 1
}

执行计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

8. Leader Partition自动平衡

在Leader选举验证小节中,如果2号和3号节点都挂了,然后又恢复,则Leader过于集中在0号和1号节点,而Kafka生产者和消费者都是只对Leader操作,所以0号和1号的压力会很大,造成负载不均衡。 未解决该问题,Kafka会自动再平衡,auto.leader.rebalance.enable默认设为true。

什么时机会触发再平衡呢?一个参考指标是broker的不平衡率,leader.imbalance.per.broker.percentage,默认是10%,另一个指标是负载检查的间隔时间,leader.imbalance.check.interval.seconds,默认是300秒。

不平衡率的计算:

实际生产环境中,不一定需要开启再平衡,因为上述例子中其实已经相对平衡了,但是根据规则,需要触发再平衡,因此会需要消耗大量资源。 

9. 文件存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件,该log文件存储的就是Kafka生产者的数据。生产的数据不断地追加到log文件中,为防止log文件过大导致检索数据慢,Kafka采取了分片和索引的机制:每个partition分为多个segment,每个segment包括.index文件(偏移量索引文件)、.log文件(日志文件)、.timeindex文件(时间戳索引文件)。这些文件位于一个文件夹中,文件夹命名规则:topic名称+分区号。index和log文件的命名是以当前segment的第一条数据的offset来命名。

log文件和index文件详解:

 

10. 文件清除策略

Kafka数据默认保存7天,7天后数据自动删除或者压缩。可通过如下参数修改保存时间(从上到下优先级依次增高):

log.retention.hours

log.retention.minutes

log.retention.ms

默认检查数据是否超期的间隔时间是5分钟,可通过参数log.retention.check.interval.ms进行修改。

如果是删除数据,log.cleanup.policy=delete,基于时间删除是默认打开的,以segment中最大的时间戳作为该文件的时间戳。而基于空间大小进行删除是默认关闭的(log.retention.bytes=-1),即数据超过阈值,删除最早的数据。

如果是压缩数据,log.cleanup.policy=compact,此时对于相同key的不同value值,只保留最新的。(与之前的snappy压缩概念不同)

注意,压缩后的offset可能不是连续的,比如上图没有 offset 6,如果从offset 6开始消费,则会从7开始消费。

11. 高效读写

1)Kafka本身是分布式集群,采用分区,并行度高

2)读数据采用稀疏索引,可以快读定位数据

3)顺序写磁盘,数据以追加的方式写到log文件,这比随机写的速度要快很多,因为省去了大量的磁头寻址时间

4)采用页缓存和零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和消费者处理。Broker应用层不关心存储的数据,因此就不用走应用层,传输效率高。(传统数据复制方式:从磁盘中读取文件到内核缓冲区,内核读取缓冲区数据复制到用户缓冲区,用户缓冲区的数据复制到socket缓冲区,socket缓冲区数据发送到网卡,再到消费者)

页缓存:Kafka重度依赖Linux提供的页缓存功能。当上层有写操作时,操作系统只是将数据写入页缓存。当读操作发生时,从页缓存中读,如果找不到,再从磁盘中读。页缓存是把尽可能多的空闲内存当做磁盘内存来用。

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-03-24 23:30:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-24 23:30:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-24 23:30:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-24 23:30:01       20 阅读

热门阅读

  1. (c/c++)——线程的基础使用

    2024-03-24 23:30:01       17 阅读
  2. rust - 将bitmap位图文件另存为png格式

    2024-03-24 23:30:01       17 阅读
  3. PostgreSQL与MySQL对比

    2024-03-24 23:30:01       19 阅读
  4. jvm底层

    jvm底层

    2024-03-24 23:30:01      15 阅读
  5. python

    2024-03-24 23:30:01       17 阅读
  6. 【机器学习-09】特征工程

    2024-03-24 23:30:01       16 阅读
  7. js和jsp的区别

    2024-03-24 23:30:01       16 阅读
  8. 组织碳管理--常见问题解答FAQ

    2024-03-24 23:30:01       19 阅读
  9. 【LeetCode-45.跳跃游戏】

    2024-03-24 23:30:01       15 阅读
  10. react native 总结

    2024-03-24 23:30:01       20 阅读