kafka进阶笔记

如何使在虚拟机上的kafka和Windows本机idea产生交互?

首先保证Windows和虚拟机能进行基本的交互

首先需要确定虚拟机的ip地址,看网上的教程除了使用ifconfig命令查虚拟机ip外,最好还是Windows和虚拟机互相ping一下,能ping通才基本满足通讯条件(网上写可能虚拟机开了防火墙ping不通,我这个能直接ping通)。

虚拟机 ping Windows

Windows ping 虚拟机

确保虚拟机打开kafka的服务

思路依次是docker ps -a查看当前运行的容器

若zookeeper和kafka没有打开,使用docker start 容器id打开容器进程

使用docker exec -it kafka容器进程id /bin/sh进入到kafka容器

上述过程见以前写的描述(上面的)

然后打开消费者,持续开启消费者进程进行消费

在windows本地的idea创建maven项目,引入依赖,写代码

(1)创建Maven工程 kafka

(2)导入依赖

<dependencies>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>3.5.1</version>
         </dependency>
</dependencies>

(3)创建包名:com.tjut.kafka.producer

(4)编写不带回调函数的 API 代码(普通的异步发送方法)

 package com.tjut.kafka.producer;
 ​
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 ​
 import java.util.Properties;
 ​
 public class CustomProducer {
     public static void main(String[] args) {
         //创建kafka生产者对象
         //""  "hello"
         //属性配置
         Properties properties = new Properties();
         //连接集群
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.147.131:9092");
 ​
         //指定对应的key和value的序列化类型
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 ​
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 ​
         //发送数据
         for (int i=0;i<5;i++) {
             kafkaProducer.send(new ProducerRecord<>("first", "wxj"+i));
         }
         //关闭资源
         kafkaProducer.close();
 ​
     }
 }

初步结果

image-20231117173111700

(5)带回调函数的异步发送:指的是生产者发送的消息不最后进入broker,在DQueue中就会回调一些结果给生产者这方

image-20231117174607154

 package com.tjut.kafka.producer;
 ​
 import org.apache.kafka.clients.producer.*;
 import org.apache.kafka.common.serialization.StringSerializer;
 ​
 import java.util.Properties;
 ​
 public class CustomProducerCallBack {
     public static void main(String[] args) {
         //创建kafka生产者对象
         //""  "hello"
         //属性配置
         Properties properties = new Properties();
         //连接集群
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.147.131:9092");
 ​
         //指定对应的key和value的序列化类型
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 ​
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 ​
         //发送数据
         for (int i=10;i<15;i++) {
             //多了回调函数
             kafkaProducer.send(new ProducerRecord<>("first", "wxj" + i), new Callback() {
                 @Override
                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                     if(e==null){
                         System.out.println("主题:"+recordMetadata.topic());
                         System.out.println("分区:"+recordMetadata.partition());
                     }
                 }
             });
         }
         //关闭资源
         kafkaProducer.close();
     }
 }

效果:

IDEA控制台

image-20231117174743877

虚拟机:

image-20231117174806660

(6)同步发送,只需在异步发送的基础上,再调用一下 get()方法即可。

 package com.tjut.kafka.producer;
 ​
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 ​
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 ​
 public class CustomProducerSync {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
         //创建kafka生产者对象
         //""  "hello"
         //属性配置
         Properties properties = new Properties();
         //连接集群
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.147.131:9092");
 ​
         //指定对应的key和value的序列化类型
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 ​
 ​
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 ​
         //发送数据
         for (int i=0;i<5;i++) {
             //多了get
             kafkaProducer.send(new ProducerRecord<>("first", "wxj"+i)).get();
         }
         //关闭资源
         kafkaProducer.close();
     }
 }

生产者优化

生产者分区

image-20231117175759015

image-20231117175812942

生产者如何提高吞吐量

image-20231117175947075

数据可靠性

image-20231117180205475

image-20231117180244005

注意:分区副本是包含Leader的! ISR也是包含Leader的!!挂了的分区就被踢出(不被包含在)ISR了。换个描述理解:ACK=-1且至少有一个Leader和一个follower且属于ISR的至少有一个Leader和一个follower

image-20231117180329956

数据去重

image-20231117180530894

image-20231117180648195

image-20231117185403377

image-20231117185717429

3)单个 Producer,使用事务保证消息的仅一次发送

 package com.tjut.kafka.producer;
 ​
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 ​
 import java.util.Properties;
 ​
 public class CustomProducerTransaction {
     public static void main(String[] args) {
         //创建kafka生产者对象
         //""  "hello"
         //属性配置
         Properties properties = new Properties();
         //连接集群
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.147.131:9092");
 ​
         //指定对应的key和value的序列化类型
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 ​
         //设置事务id
         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");
 ​
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 ​
         // 初始化事务
         kafkaProducer.initTransactions();
         // 开启事务
         kafkaProducer.beginTransaction();
 ​
         try {
             //发送数据
             for (int i=0;i<5;i++) {
                 kafkaProducer.send(new ProducerRecord<>("first", "wxj"+i));
             }
             int i = 1 / 0;
             //提交事务
             kafkaProducer.commitTransaction();
         }catch (Exception e){
             System.out.println("事务终止!!!");
             //终止事务
             kafkaProducer.abortTransaction();
         }finally {
             //关闭资源
             kafkaProducer.close();
         }
     }
 }

控制台输出:

image-20231117190416689

数据有序

image-20231117190458109

数据乱序

image-20231117190608038

Kafka Broker

Zookeeper 存储的 Kafka 信息

image-20231117190749237

Kafka Broker 总体工作流程

image-20231117190847810

Broker重要参数

image-20231117191133068

image-20231117191218026

节点服役和退役

略,感觉是和大数据相关的问题

Kafka 副本

副本基本信息

(1)Kafka副本作用:提高数据可靠性。

(2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka中副本分为:Leader和Follower. Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。

(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas).

AR =ISR +OSR

ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms 参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader.

OSR,表示Follower与Leader副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

image-20231117191822210

(0)执行下面命令的前提是,我需要有多个broker分区,可以参考教程kafka基本操作(二) linux环境下多个broker-阿里云开发者社区 (aliyun.com)

基本思路是复制配置文件,修改里面的端口号

image-20231117192431747

 cp server.properties server9093.properties 
 cp server.properties server9094.properties 
 cp server.properties server9095.properties 

image-20231117192659231

然后进去文件里修改

 vim server9093.properties 
 //修改server9093.properties 
 broker.id=2
 port=9093
 ​
 vim server9094.properties 
 //修改server9094.properties 
 broker.id=3
 port=9094
 ​
 vim server9095.properties 
 //修改server9095.properties 
 broker.id=4
 port=9095

插眼

(1)创建一个新的 topic,4 个分区,4 个副本

 kafka-topics.sh --bootstrap-server  127.0.0.1:9092 --create --topic test1 --partitions 4 --replication-factor 4

Leader 和 Follower 故障处理细节

image-20231117195725283

image-20231117195735378

消费者

基本实现

独立消费者案例(订阅主题)

image-20231117202538540

package com.tjut.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class CustomerConsumer {
    public static void main(String[] args) {
        //创建kafka消费者对象
        //""  "hello"
        //属性配置
        Properties properties = new Properties();
        //连接集群,必须
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.147.131:9092");

        //指定对应的key和value的反序列化类型,必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 注册要消费的主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first");
        // 消息订阅 topic name is “customerCountries”
        kafkaConsumer.subscribe(topics);

        // 拉取数据打印
        while (true) {
            // 设置 1s 中消费一批数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

运行结果

ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 38, CreateTime = 1700223239574, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = wxj0)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 39, CreateTime = 1700223239588, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = wxj1)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 40, CreateTime = 1700223239590, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = wxj2)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 41, CreateTime = 1700223239590, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = wxj3)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1700223239590, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = wxj4)

独立消费者案例(订阅分区)

核心代码

// 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
 topicPartitions.add(new TopicPartition("first", 0));
 kafkaConsumer.assign(topicPartitions);

消费者组案例

image-20231117202948732

基本思想:复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中 的两个消费者。

分区的分配以及再平衡

image-20231117203213722

image-20231117203705749

image-20231117203726852

Range 以及再平衡

image-20231117203749551

image-20231117204108838

RoundRobin 以及再平衡

image-20231117204216076

image-20231117204243775

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分 区不变化。

image-20231117204341177

offset 位移

消费者事务

image-20231117204512726

数据积压(消费者如何提高吞吐量)

image-20231117204624764

相关推荐

  1. Bash script笔记

    2023-12-08 07:46:03       57 阅读
  2. kafka面试题(基础--高

    2023-12-08 07:46:03       29 阅读
  3. Vue笔记(五)路由

    2023-12-08 07:46:03       57 阅读
  4. golang学习笔记——数据结构

    2023-12-08 07:46:03       57 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-08 07:46:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-08 07:46:03       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-08 07:46:03       82 阅读
  4. Python语言-面向对象

    2023-12-08 07:46:03       91 阅读

热门阅读

  1. 【.NET Core】Linq查询运算符(二)

    2023-12-08 07:46:03       45 阅读
  2. 设置Ubuntu或树莓派系统,允许root用户ssh方式连接

    2023-12-08 07:46:03       60 阅读
  3. Ubuntu 20.04 安装Orthanc

    2023-12-08 07:46:03       54 阅读
  4. extern”C”的作用及注意事项

    2023-12-08 07:46:03       60 阅读
  5. 微信小程序uni-app:常用Form表单组件使用示例

    2023-12-08 07:46:03       61 阅读
  6. Boost:asio多io_service,多线程run

    2023-12-08 07:46:03       48 阅读
  7. 使用单例模式+观察者模式实现参数配置实时更新

    2023-12-08 07:46:03       45 阅读
  8. 芯知识 | 什么是单片机语音芯片?

    2023-12-08 07:46:03       69 阅读
  9. GO设计模式——11、装饰器模式(结构型)

    2023-12-08 07:46:03       55 阅读
  10. TrustZone概述

    2023-12-08 07:46:03       57 阅读