kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

一、kafka生产者客户端

1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {
		//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容
		public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
		//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数
		public void onAcknowledgement(RecordMetadata metadata, Exception exception);
		public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0

broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {
		public static KafkaConsumer<String, String> kafkaConsumer;
		public static void main(String[] args) {
				Properties properties = new Properties();
				properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhos
				properties.put(KafkaConstant.GROUP_ID, "test01");
				properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");
				properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");
				properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.
				properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clas
				properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
				properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");
				properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
				kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);
				
				receiveMessage("rack02");
		}
		public static void receiveMessage(String topic) {
				TopicPartition topicPartition0 = new TopicPartition(topic, 0);
				kafkaConsumer.assign(Arrays.asList(topicPartition0));
				while(true) {
						// Kafka的消费者⼀次拉取⼀批的数据
						ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll
						//System.out.println("开始打印消息!");
						// 5.将将记录(record)的offset、key、value都打印出来
						for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
								// 主题
								String topicName = consumerRecord.topic();
								int partition = consumerRecord.partition();
								// offset:这条消息处于Kafka分区中的哪个位置
								long offset = consumerRecord.offset();
								// key\value
								String key = consumerRecord.key();
								String value = consumerRecord.value();
								System.out.println(String.format("topic: %s, partition: %s, offs
						}
				}
		}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

相关推荐

  1. 服务客户以及前后相关概念区分

    2024-01-10 06:12:07       26 阅读
  2. Kafka 基本概念术语

    2024-01-10 06:12:07       41 阅读
  3. Kafka客户实战

    2024-01-10 06:12:07       34 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-10 06:12:07       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-10 06:12:07       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-10 06:12:07       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-10 06:12:07       20 阅读

热门阅读

  1. Python数据类型转换

    2024-01-10 06:12:07       36 阅读
  2. #{}和${}的区别?

    2024-01-10 06:12:07       23 阅读
  3. 离线安装docker和docker-compose

    2024-01-10 06:12:07       40 阅读
  4. 深度学习中Epoch和Batch Size的关系

    2024-01-10 06:12:07       35 阅读
  5. 树莓派Debian系统中如何用mDNS广播自己的ip地址

    2024-01-10 06:12:07       34 阅读
  6. [力扣 Hot100]Day1 两数之和

    2024-01-10 06:12:07       41 阅读
  7. 【docker】docker-compose.yml 语法详解

    2024-01-10 06:12:07       34 阅读
  8. nginx upstream负载均衡模块

    2024-01-10 06:12:07       28 阅读
  9. xcode 14.3升级 pod升级

    2024-01-10 06:12:07       29 阅读
  10. Hive之set参数大全-2

    2024-01-10 06:12:07       27 阅读
  11. qt-day2

    qt-day2

    2024-01-10 06:12:07      31 阅读
  12. xcode-docC

    2024-01-10 06:12:07       35 阅读
  13. Hive之函数解析

    2024-01-10 06:12:07       28 阅读
  14. 与AI合作 -- 单例工厂2遗留的问题:bard的错误

    2024-01-10 06:12:07       33 阅读
  15. 【力扣100】【好题】155.最小栈

    2024-01-10 06:12:07       40 阅读
  16. ES6规范

    2024-01-10 06:12:07       28 阅读