【Kafka】Kafka Producer 分区-05

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {
    // 初始化方法
    @Override
    public void configure(Map<String, ?> configs) {
        // 配置代码
    }

    // 计算分区的方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 分区计算逻辑
    }

    // 清理资源的方法
    @Override
    public void close() {
        // 资源清理代码
    }
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

		// 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
            kafkaProducer.send(new ProducerRecord<>("first",
                    1, "", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
            kafkaProducer.send(new ProducerRecord<>("first",
                    "a", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口 Partitioner
 * 2. 实现 3 个方法:partition,close,configure
 * 3. 编写 partition 方法,返回分区号
 */
public class MyPartitioner implements Partitioner {
    /**
     * 返回信息对应的分区
     *
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[]
            keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("atguigu")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atgui
                gu.kafka.producer.MyPartitioner");
                KafkaProducer < String, String > kafkaProducer = new
                        KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {

            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

相关推荐

  1. 2024-05-08 postgres-调试及分析-记录

    2024-06-15 23:12:07       12 阅读
  2. 学习笔记——交通安全分析05

    2024-06-15 23:12:07       9 阅读
  3. <span style='color:red;'>02</span>.<span style='color:red;'>05</span>

    02.05

    2024-06-15 23:12:07      33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-15 23:12:07       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-15 23:12:07       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-15 23:12:07       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-15 23:12:07       18 阅读

热门阅读

  1. 技术周总结2024.06.10~06.16

    2024-06-15 23:12:07       6 阅读
  2. 【LVGL v8.3】切换界面时内存变化分析

    2024-06-15 23:12:07       7 阅读
  3. 支持向量机(SVM)中核函数的本质意义

    2024-06-15 23:12:07       6 阅读
  4. 前端进阶-js查漏补缺

    2024-06-15 23:12:07       4 阅读
  5. 78978

    2024-06-15 23:12:07       5 阅读
  6. git常用指令及bug解决(更新自用)

    2024-06-15 23:12:07       7 阅读
  7. JDBC简介

    2024-06-15 23:12:07       6 阅读