Kafka(三)Producer第二篇

一,生产者架构


生产者客户端由两个线程协调运行,分别为主线程和Sender线程(发送线程)。
  • 主线程:KafkaProducer创建消息,通过拦截器、序列化器和分区器之后缓存到消息收集器RecordAccumulator中;
  • Sender线程:从RecordAccumulator中获取消息并发送到Kafka集群;
1,RecordAccumulator
  • RecordAccumulator用来缓存消息以便Sender 线程批量发送,进而减少网络传输的资源消耗;
  • 消息会被追加到RecordAccumulator的某个双端队列中, 每个partition都维护了一个双端队列;
  • 双端队列内容是Deque<ProducerBatch>,ProducerBatch包含一至多个ProducerRecord;
2,主线程写入消息到RecordAccumulator
消息在发送之前会缓存在java.io.ByteBuffer的内存区域。RecordAccumulator的内部有一个BufferPool,用来实现ByteBuffer的复用,以实现缓存的高效利用。 BufferPool只对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个大小由batch.size参数指定;
消息流入RecordAccumulator过程:
1,先寻找与partition对应的双端队列(如果没有则新建);
2,从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建);
3,判断ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则创建一个新的ProducerBatch;
4,新建ProducerBatch时,评估这条消息的大小是否超过batch.size参数的大小:
    a,如果不超过,那么就以 batch.size 的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;
    b,如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用
3,Sender线程读取RecordAccumulator并发送
1. Sender从RecordAccumulator中获取消息,会将原本<分区,Deque<ProducerBatch>>的形式转变成<Node,List<ProducerBatch>,其中Node表示Kafka集群的broker节点
2. Sender将消息进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node;
3,Sender线程发送Request之前,请求还会保存到InFlightRequests(保存的形式为 Map<NodeId,Deque<Request>>,缓存了已经发出去但还没有收到响应的请求)中;
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。负载最小是通过每个Node在InFlightRequests中 还未确认的请求决定的,未确认的请求越多则认为负载越大。
选择leastLoadedNode发送请求可以使它能够尽快发出。

二,元数据的更新


元数据是指Kafka集群的元数据,包括主题、分区、副本分布、哪些副本在AR、ISR等集合、集群中有哪些节点、控制器节点又是哪一个等等。
元数据的更新操作是由Sender线程发起的,对客户端的外部使用者不可见。

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-09 18:36:09       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-09 18:36:09       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-09 18:36:09       58 阅读
  4. Python语言-面向对象

    2024-07-09 18:36:09       69 阅读

热门阅读

  1. AIGC学习笔记—LLM(前言)

    2024-07-09 18:36:09       29 阅读
  2. 【Spring Boot】thymeleaf模板引擎

    2024-07-09 18:36:09       26 阅读
  3. SpringBoot Mybatis-Plus 日志带参数

    2024-07-09 18:36:09       25 阅读
  4. 测试绩效评估

    2024-07-09 18:36:09       23 阅读
  5. 【Datagear】使用参数时的If语法

    2024-07-09 18:36:09       22 阅读
  6. 实现基于Elasticsearch的搜索服务

    2024-07-09 18:36:09       27 阅读
  7. 【网络协议】ISIS

    2024-07-09 18:36:09       23 阅读
  8. 第三章 设计模式(2023版本IDEA)

    2024-07-09 18:36:09       23 阅读
  9. 命令模式在金融业务中的应用及其框架实现

    2024-07-09 18:36:09       27 阅读
  10. 【C语言】标识符大通关!

    2024-07-09 18:36:09       30 阅读
  11. Python面试题-8

    2024-07-09 18:36:09       25 阅读
  12. HPE ProLiant MicroServer Gen8加装显卡

    2024-07-09 18:36:09       23 阅读