Kafka Producer发送消息流程之分区器和数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        //当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数
        if (record.partition() != null)
            return record.partition();

        // 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据
        if (partitioner != null) {
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }


        // 如果没有分区器的情况
        if (serializedKey != null && !partitionerIgnoreKeys) {
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }


// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    }

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // Ensure the key is a non-null string
        if (key == null || !(key instanceof String)) {
            throw new IllegalArgumentException("Key must be a non-null String");
        }

        // Parse the key as an integer
        int keyInt;
        try {
            keyInt = Integer.parseInt((String) key);
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Key must be a numeric string", e);
        }

        // Determine the partition based on the key's odd/even nature
        if (keyInt % 2 == 0) {
            return 1; // Even keys go to partition 2
        } else {
            return 0; // Odd keys go to partition 0
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {
    public static void main(String[] args) throws InterruptedException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定拦截器
        config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
        //指定分区器
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test1",
                    "key"+i,
                    "我是你爹"+i
            );
            //发送record
            producer.send(record);
            Thread.sleep(500);
        }

        //关闭producer
        producer.close();
    }
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

相关推荐

  1. JVM垃圾收集CMS垃圾收集G1垃圾收集

    2024-07-17 04:34:03       43 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-17 04:34:03       70 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-17 04:34:03       74 阅读
  3. 在Django里面运行非项目文件

    2024-07-17 04:34:03       62 阅读
  4. Python语言-面向对象

    2024-07-17 04:34:03       72 阅读

热门阅读

  1. ARP协议

    2024-07-17 04:34:03       27 阅读
  2. 基于Go1.19的站点模板爬虫

    2024-07-17 04:34:03       26 阅读
  3. 刷题Day54|99. 岛屿数量、100. 岛屿的最大面积

    2024-07-17 04:34:03       26 阅读
  4. 日耗100和100W投手思维的区别

    2024-07-17 04:34:03       22 阅读
  5. C语言经典程序100案例

    2024-07-17 04:34:03       19 阅读
  6. 【数据结构】顺序表

    2024-07-17 04:34:03       21 阅读
  7. 类和对象(2

    2024-07-17 04:34:03       29 阅读
  8. Elasticsearch:6.0及其ES-Head插件安装

    2024-07-17 04:34:03       25 阅读
  9. 【架构-20】引擎和库

    2024-07-17 04:34:03       26 阅读
  10. 如何在vue3中实现动态路由

    2024-07-17 04:34:03       24 阅读
  11. JWT 认证校验 从理论到实战

    2024-07-17 04:34:03       28 阅读
  12. vue3 学习笔记12 -- 插槽的使用

    2024-07-17 04:34:03       25 阅读
  13. PHP 包含

    2024-07-17 04:34:03       19 阅读
  14. 扫地机器人如何解决室内空气污染问题

    2024-07-17 04:34:03       20 阅读