大数据队列Kafka

了解什么是kafka之前,首先要了解一下什么是消息队列

一丶kafka的基本概述

消息队列:MQ介绍

    • 定义

      • 官方定义:消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

      • 简单点说:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存

    • 功能

      • 基于队列的方式,实现消息传递的数据缓存【长久】

        • 队列的特点:顺序:先进先出

    • 应用场景

      • 用于所有需要实现实时、高性能、高吞吐、高可靠的消息传递架构中

      • 大数据应用中:作为唯一的实时数据存储平台

        • 实时数据采集:生产写入Kafka

        • 实时数据处理:消费读取Kafka

    • 优点

      • 实现了架构解耦

        • 需求:C也需要A的数据

        • 如果不构建消息队列:A =》 B

          • 停止A,修改A的代码,添加发送C的代码

          • 高耦合度的

        • 如果构建了消息队列:A =》 MQ 《= B

          • 直接让C从MQ中取即可

          • 低耦合度的

      • 保证了最终一致性

        • 最终可以保证实现最初的需求

      • 实现异步,提供传输性能

        • A给B和C都发一份数据

        • 不做消息队列

          • A发送:10s

          • B接受:1000s

          • C接受:1000s

          • 总共:2020s

        • 做了消息队列

          • A:10s

          • B和C并行接受:1000s

          • 总共:1010s

      • 限流削峰:合理根据成本来控制资源

    • 缺点

      • 增加了消息队列,架构运维更加复杂

        • 必须保证消息队列是高可靠的

        • 如果消息队列故障,整个所有系统都瘫痪了

        • | 保证消息队列即使机器出现故障,消息队列也能正常运行 =》 分布式

      • 数据保证更加复杂,必须保证生产安全和消费安全

        • 数据安全

          • 数据在传输过程中:不丢失、不重复

  • 小结

    • 消息队列的功能、特点是什么?

    • 功能:实现两个系统或者两个模块之间的数据缓存,解决高并发读写

      • 优点:架构解耦、异步模式能提高并发性能

      • 缺点:架构和安全维护更加麻烦

消息队列:同步与异步

    • 同步的概念

      • 流程

        • step1:用户提交请求

        • step2:后台处理请求

        • step3:将处理的结果返回给用户,用户继续下一步操作

      • 特点:用户看到的结果就是我处理好的结果,等待看到结果以后,进行下一步

      • 场景:去银行存钱、转账等,必须看到真正处理的结果才能表示成功,实现立即一致性

      • 优点:结果肯定是准确的

      • 缺点:性能问题

    • 异步的概念

      • 流程

        • step1:用户提交请求

        • step2:后台将请求放入消息队列,等待处理,返回给用户一个临时结果,用户不管这次的结果是什么,直接进行下一步

        • step3:用户看到临时的结果,真正的请求在后台等待处理

      • 特点:用户看到的结果并不是我们已经处理的结果

      • 场景:用户暂时不需要关心真正处理结果的场景下,只要保证这个最终结果是用户想要的结果即可,实现最终一致性

      • 优点:性能更高

      • 缺点:可能结果误差

    • 数据传递的同步与异步

      • A给B发送消息:基于UDP协议

        • A首先给B发送一条数据

        • A不管B有没有收到,继续发送下一条

        • 优点:快

        • 缺点:数据容易丢失

        • 异步过程

      • A给B发送消息:基于TCP协议

        • A首先给B发送一条数据

        • A会等待B告诉A收到了这条消息,A才会发送下一条

        • 优点:安全

        • 缺点:性能相对差一些

        • 同步过程

  • 小结

    • 同步:提交和处理是同步操作,立即就能看到结果,立即一致性

    • 异步:提交和处理是异步操作,最终得到一个处理的结果,最终一致性

消息队列:点对点模式

  • 实施

    • 角色

      • 生产者:负责往消息队列中写数据的

      • 消息队列:负责缓存传递的数据

      • 消费者:负责从消息队列中读取数据的

    • 流程

      • step1:生产者要往消息队列中写数据

      • step2:消费者从消息队列中读数据

      • step3:消费者消费成功以后,会返回一个确认ack给消息队列,消息队列会将消费成功的数据删除

  • 小结

    • 特点:数据只能被一个消费者使用,消费成功以后数据就会被删除,无法实现消费数据的共享

消息队列:订阅发布模式

  • 实施

    • 角色

      • 生产者

      • 消息队列

      • 消费者

      • Topic:主题,用于划分存储不同业务的数据

    • 流程

      • step1:生产者往消息队列中生产数据,将数据写入对应的主题中

      • step2:消费者可以订阅主题,如果主题中出现新的数据,消费就可以立即消费

    • 特点:一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅

      • 消费成功以后,不会立即主动删除数据

      • 可以实现数据共享

  • 小结

    • 什么是发布订阅模式?

      • 发布:生产者不断将最新的数据生产写入消息队列的主题中

      • 订阅:消费者只要订阅了主题,就能立即获取最新的数据

      • 类似于微信公众号

Kafka的介绍及特点

  • 实施

    • 官网:kafka.apache.org

      • 领英公司基于Scala语言开发的工具

      • Scala语言:基于JVM之上的语言

        val inputRdd = sc.textFile("new Path")
        val wcRdd = inputRdd
            .flatMap(_.trim.split(" "))
            .map((_,1))
            .reduceByKey(_+_)
        wcRdd.saveAsTextFile
        
    • 功能

      • 分布式流式数据实时存储:分布式存储

        • 实时消息队列存储,工作中主要使用的功能

      • 分布式流式计算:分布式计算:KafkaStream

        • 这个功能一般不用

    • 定义

      • 分布式的基于订阅发布模式的高吞吐高性能的实时消息队列系统

    • 应用场景

      • 实时场景

      • 目前:只要做实时大数据,都必用Kafka

        • 离线数据仓库:Hive

        • 实时数据仓库:Kafka

      • Kafka生产者:数据采集的工具

      • Kafka消费者:实时计算的程序

    • 特点

      • 高性能:实时的对数据进行实时读写

        • Kafka也使用内存

      • 高并发:分布式并行读写

        • 分布式架构

      • 高吞吐:使用分布式磁盘存储

        • Kafka也基于磁盘

      • 高可靠:分布式主从架构

      • 高安全性:数据安全保障机制

        • 内存 + 磁盘:副本

        • 这个内存非常特殊:操作系统级别,即使Kafka服务故障,数据依旧存在,只有机器故障才受影响

      • 高灵活性:根据需求,随意添加生产者和消费者

        • 异步模式

  • 小结

    • Kafka在大数据中专门用于实现实时的数据存储,实现大数据实时计算

存储结构

  • MySQL:数据库、表、行数据【列】

  • HDFS:目录、文件 / 块、行数据

  • Redis:数据库、分片【小集群】、KV

Kafka概念:Producer、Broker、Consumer

  • 实施

    • Broker:Kafka是一个分布式集群,多台机器构成,每台Kafka的节点就是一个Broker

    • Producer:生产者

      • 负责将数据写入Kafka中,工作中一般生成都是数据采集工具

      • 本质:==Kafka写入数据的客户端==

        • Kafka的每条数据格式:KV格式

    • Consumer:消费者

      • 负责从Kafka中消费数据

      • 本质:==Kafka读取数据的客户端==

        • 消费数据:主要消费的数据是V

    • Consumer Group:==Kafka中必须以消费者组的形式从Kafka中消费数据==

      • 消费者组到kafka消费数据

      • 任何一个消费者必须属于某一个消费者组

      • 一个消费者组中可以有多个消费者:多个消费者共同并行消费数据,提高消费性能

        • 消费者组中多个消费者消费的数据是不一样的

        • 整个消费者组中所有消费者消费的数据加在一起是一份完整的数据

  • 小结

    • 生产者 =》 Kafka 集群【多个Broker】 《= 消费者组【消费者】

Kafka概念:Topic、Partition

  • 实施

    • Topic:数据主题,用于区分不同的数据,对数据进行分类

      • 类似于MySQL中会将数据划分到不同的表:不同的数据存储在不同的表中

      • Kafka是分布式存储

      • Topic就是分布式的概念:一个Topic可以划分多个分区Partition,每个不同分区存储在不同的Kafka节点上

        • 写入Topic的数据实现分布式存储

      • 问题:生产者写入一条KV结构数据,这条数据写入这个Topic的哪个分区由分区规则来决定,分区规则是什么呢?

        • 有多种分区规则:不同场景对应的分区规则不一样

    • Partition:数据分区,用于实现Topic的分布式存储,对Topic的数据进行划分

      • 每个分区存储在不同的Kafka节点Broker上

      • 写入Topic:根据一定的规则决定写入哪个具体的分区

  • 小结

    • 什么是Topic,什么是Partition?

    • Topic:类似于数据库或者表的概念,用于对数据进行分类,不同业务的数据放入不同Topic

      • Kafka的存储是分布式存储

      • 数据都是读写Topic

      • Topic就是分布式存储

      • Partition:一个Topic可以划分多个Partition,写入Topic的数据可以存储在不同的Partition中

        • 不同Partition可以存储在不同的Kafka节点上

Kafka概念:分区副本机制

  • 实施

    • 问题1:Kafka中的每个Topic的每个分区存储在不同的节点上,如果某个节点故障,怎么保证集群正常可用?

      • Kafka选用了==副本机制==来保证数据的安全性

        • 如果某台机器故障,其他机器还有这个分区的副本,其他机器的副本照样可以对外提供客户端读写

      • Kafka每一个分区都可以有多个副本

        • 类似于HDFS的副本机制,一个块构建多个副本

      • 注意:Kafka中一个分区的副本个数最多只能等于机器的个数,相同分区的副本不允许放在同一台机器,没有意义

    • 问题2:一个分区有多个副本,读写这个分区的数据时候,到底读写哪个分区副本呢?

      • Kafka将一个分区的多个副本,划分为两种角色

      • Leader副本:负责对外提供读写

        • 生产者和消费者只对leader副本进行读写

      • Follower副本

        • 与Leader同步数据

        • 如果leader故障,从follower新的leader副本对外提供读写

  • 小结

    • Kafka怎么保证分区数据安全?

      • 副本机制,一个分区可以有多个副本,相同分区的副本不能存储在同一台机器

    • Kakfa如何决定分区副本的读写?

      • 每个分区的副本划分为两种角色

      • leader:对外提供读写

      • follower:与Leader同步数据,如果Leader故障,从Follower选举一个新的Leader

Kafka概念:Segment

    • 定义:对每个分区的数据进行了更细的划分,先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对

    • 内容:每个Segment对应一对【两个】文件

      • xxxxxxxxx.log:存储数据

      • xxxxxxxxx.index:对应.log的文件的数据索引

    • 设计:为了加快数据检索的效率,将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件,读取对应的小的segment文件,不用读取所有数据文件

    • 举例

      • 如果分区第一次写入数据,会产生第一个segment

        00000000000000000000000.log
        00000000000000000000000.index
        00000000000000000000000.timeindex
      • 当文件越来越大,存储的数据越来越多,影响读的性能,会再构建一个新的segment,老的segment不再被写入

        00000000000000000000000.log         
        00000000000000000000000.index
        00000000000000000000000.timeindex
        ​
        00000000000000000199999.log
        00000000000000000199999.index
        00000000000000000199999.timeindex
        ​
        00000000000002000000000.log
        00000000000002000000000.index
        00000000000002000000000.timeindex
      • Segment文件的名字就是这个Segment记录的offset的最小值

        • 消费者消费数据是根据offset进行消费的

        • 消费者1:想消费分区1:39999这个offset开始消费

        • 先根据文件文件名来判断我要的offset在哪个文件中

  • 小结

    • 什么是Segment?

      • 功能:对分区内部的数据进行了划分

      • 规则:先写入的数据先写入第一个Segment,达到一定条件,数据就写入新的Segment对中

      • 实现:每个Segment中包含两种文件

        • xxxxxx.log:数据

        • xxxxxx.index:对应.log文件的索引

      • 设计:为了加快查询效率

Kafka概念:Offset

  • 定义Kafka中所有消费者数据的读取都是按照Offset来读取数据,每条数据在自己分区中的偏移量
    • 先写入的offset就越小

      • 第一条数据的offset就为0

      • 第二条数据的offset就为1

      • ……

    • 消息队列:先进先出

    • 写入分区的顺序就是offset偏移量,==Offset是分区级别的==,每个分区的offset独立管理,都从0开始

    • 生成:生产者往Kafka中写入数据,写入某个分区

      • 每个分区单独管理一套Offset【分区】,offset从0开始对每条数据进行编号

      • Kafka写入数据也是按照KV来写入数据

        #Kafka中一条数据存储的结构
        offset      Key             Value
    • 功能:基于offset来指定数据的顺序,消费时候按照offset顺序来读取

      • 消费者消费Topic分区中的数据是按照offset进行顺序消费的

      • 怎么保证不丢失不重复:只要保证消费者每次按照offset的顺序消费即可

        • 如果没有Offset

          • 从头取一遍:数据重复

          • 从最新的去:数据丢失

  • 小结

    • Offset用于标记分区中的每条数据,消费者根据上一次消费的offset对分区继续进行消费,保证顺序

    • 实现保证数据不丢失不重复

二丶kafka的部署使用

Kafka集群架构

  • 目标了解Kafka集群架构及角色功能

  • 路径

    • Kafka集群有哪些角色?

    • Kafka每个角色的功能是什么?

    • Zookeeper在架构中的作用是什么?

  • 架构角色

      • Kafka:分布式主从架构,实现消息队列的构建

      • Zookeeper:辅助选举Controller、元数据存储

    • Kafka中的每个角色以及对应的功能

      • 分布式主从架构

        • 节点:Broker

        • 进程:Kafka

      • 主:Kafka ==Controller==

        • 是一种特殊的Broker,从所有Broker中选举出来的,负责普通Broker的工作

        • 负责管理所有从节点:Topic、分区和副本

        • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现

      • 从:Kafka Broker

        • 对外提供读写请求

        • 其他的Broker监听Controller,如果Controller故障,会重新从Broker选举一个新的Controller

    • ZK的功能

      • 辅助选举Controller

      • 存储元数据

  • 小结

    • kafka是一个主从架构,整体对外提供分布式读写

    • ZK主要负责选举Controller和实现元数据存储

Kafka分布式集群部署

  • 目标实现Kafka分布式集群的搭建部署

  • 路径

    • step1:选择版本

    • step2:下载解压安装

    • step:3:修改配置文件

  • 实施

    • 版本的选型

      • 0.8.x:老的版本,很多的问题

      • 0.10.x +:消息功能上基本没有问题

      • 选择:kafka_2.12-2.4.1.tgz

        • Kafka:2.4.1

        • Scala:2.12,Kafka是由Scala语言开发

    • 下载解压安装

      • 下载:Index of /dist/kafka

      • 上传到第一台机器

        cd /export/software/
        rz
      • 解压

        tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
        cd /export/server/kafka_2.12-2.4.1/
        mkdir logs
        • bin:一般用于存放客户端操作命令脚本

        • sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中

        • conf/etc/config:配置文件目录

        • lib:jar包的存放目录

        • logs:一般用于存放服务日志

    • 修改配置

      • 切换到配置文件目录

        cd /export/server/kafka_2.12-2.4.1/config
        
      • 修改server.properties

        #21行:唯一的 服务端id
        broker.id=0
        #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
        log.dirs=/export/servers/kafka_2.12-2.4.1/logs 
        #123行:指定zookeeper的地址
        zookeeper.connect=node01:2181,node02:2181,node03:2181
        #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
        delete.topic.enable=true
        host.name=node01
      • 分发

        cd /export/servers/
        scp -r kafka_2.12-2.4.1 node02:$PWD
        scp -r kafka_2.12-2.4.1 node03:$PWD
      • 第二台:server.properties

        #21行:唯一的 服务端id
        broker.id=1
        #最后
        host.name=node02
      • 第三台:server.properties

        #21行:唯一的 服务端id
        broker.id=2
        #最后
        host.name=node03
      • 添加环境变量

        vim /etc/profile
        
        #KAFKA_HOME
        export KAFKA_HOME=/export/servers/kafka_2.12-2.4.1
        export PATH=:$PATH:$KAFKA_HOME/bin
        source /etc/profile
        
  • 小结

    • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可

    • 解压安装

    • 修改配置:server.properties

Kafka启动与关闭

  • 目标掌握kafka集群的启动与关闭命令及脚本封装

  • 路径

    • step1:如何启动Kafka集群?

    • step2:如何关闭Kafka集群?

    • step3:如何封装启动及关闭脚本?

  • 实施

    • 启动Zookeeper

      start-zk-all.sh 
    • 启动Kafka

      bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
      ​
       >>/dev/null 2>&1 &:在后台运行
    • 关闭Kafka

      bin/kafka-server-stop.sh 
      
    • 封装Kafka脚本

      这里我封装的也不太好,有时间可以去网上搜一下详解
  • 小结

    • 启动:kafka-server-start.sh

    • 关闭:kafka-server-stop.sh

Topic管理:创建与列举

  • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

  • 路径

  • 实施

    • 创建Topic

    • bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
      • --create:创建

      • --topic:指定名称

      • --partitions :分区个数

      • --replication-factor:分区的副本个数

      • --bootstrap-server:指定Kafka服务端地址

      • --list:列举

    • 列举Topic

      bin/kafka-topics.sh --list -bootstrap-server node01:9092,node02:9092,node03:9092
      
  • 小结

    • 掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

Topic管理:查看与删除

  • 实施

    bin/kafka-topics.sh --delete --topic bigdata02 --bootstrap-server node01:9092,node02:9092,node03:9092

    • 查看Topic信息

      bin/kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node01:9092,node02:9092,node03:9092
      • 结果

        Topic: bigdata01   PartitionCount: 3   ReplicationFactor: 2  Configs: segment.bytes=1073741824
        ​
        Topic: bigdata01        Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: bigdata01        Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: bigdata01        Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0
        • 每个分区都有一个唯一的标号:从0开始

        • 怎么唯一标识一个分区:Topic名称+分区编号

        • Leader:这个分区的Leader副本所在的Broker id

        • Replicas:这个分区的所有副本所在的Broker的id

        • ISR:in -sync -replicas:可用副本

    • 删除Topic

      kafka-topics.sh --delete --topic bigdata02 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092 
      
  • 小结

    • 查看信息:describe

    • 删除:delete

生产者及消费者测试

  • 实施

    • Console生产者

    • bin/kafka-console-producer.sh --topic bigdata01 --broker-list node01:9092,node02:9092,node03:9092
      
    • Console消费者

      bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node01:9092,node02:9092,node03:9092  --from-beginning
      
      • --from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

      • 默认从最新位置开始消费

      • --from-beginning:从最早的位置开始消费

  • 小结

    • 只要生产者不断生产,消费就能实时的消费到Topic中的数据

可视化工具Kafka Tool

  • 实施

    • 安装Kafka Tool:不断下一步即可

    • 构建集群连接:连接Kafka集群

      查看集群信息

  • 小结

    • 可视化工具,界面或者交互性不是很友好

    • 后面会学习:Kafka Eagle

Kafka集群压力测试

  • 实施

    • 创建Topic

      bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
      
    • 生产测试

      kafka-producer-perf-test.sh --topic bigdata --num-records 1000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 acks=1
      
      • --num-records:写入数据的条数

      • --throughput:是否做限制,-1表示不限制

      • --record-size:每条数据的字节大小

    • 消费测试

      kafka-consumer-perf-test.sh --topic bigdata --broker-list node01:9092,node02:9092,node03:9092  --fetch-size 1048576 --messages 1000000
      
  • 小结

    • 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

Kafka API 的应用

  • 实施

    • 命令行使用Kafka

      • 一般只用于topic的管理:创建、删除

    • 大数据架构中使用Kafka

      • Java API:构建生产者和消费者

      • 工作中一般不用自己开发生产者

      • 生产者:数据采集工具

        • Flume:Kafka sink

          • 配置kafka集群地址

          • Topic的名称

      • 消费者:实时计算程序

        • SparkStream:KafkaUtil

          KafkaUtil.createDirectStream(Kafka集群地址,消费的Topic)
      • 这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可

      • ==重点掌握:用到哪些类和方法==

    • Kafka的API的分类

      • High Level API:高级API

        • 基于了SimpleAPI做了封装,让用户开发更加方便

        • 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全

        • Offset自动存储Zookeeper中,不用自己管理

      • ==Simple API:简单API==

        • 并不简单,最原始的API

        • 自定义控制所有消费和生产、保证数据安全

  • 小结

    • 大数据工作中一般不自己开发Java API:掌握类和方法即可

    • 只使用Simple API来实现开发

生产者API:生产数据到Kafka

  • 实施

    package bigdata.itcast.cn.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class KafkaProducerClient {
        public static void main(String[] args) {
            //todo:1 构建Kafka生产者连接对象
            //构建连接配置
            Properties props=new Properties();
            //指定服务端地址
            props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9093");
            /**
             * 问题:生产者怎么保证生产数据不丢失? ack机制 + 重试机制
             * ack:数据传输的确认码,用于定义生产者如何将数据写入Kafka
             * 0:生产者发送一条数据写入Kafka,不管Kafka有没有写入这条数据,都直接发送下一条【快,不安全,不用的】
             * 1:中和性方案,生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,就返回一个ack,生产者收到ack,发送下一条
             *  【性能和安全性之间做了权衡】
             * all/-1:生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,并且等待所有ISR:Follower同步成功,就返回一个ack,生产者收到ack,发送下一条
             *         【安全,慢】
             * 如果ack为1或者为all,生产者没有收到ack,就认为数据丢失,重新发送这条数据
             **/
            props.put("acks", "all");
            //指定写入Kafka中的KV的序列化的类
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //构建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            //todo:2-调用连接对象的方法实现生产数据
            for (int i = 0; i < 10; i++){
                //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Key、Value
                // producer.send(new ProducerRecord<String, String>("bigdata01", i+"", "itcast"+i));
    
                //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Value
                //producer.send(new ProducerRecord<String, String>("bigdata01",  "itcast"+i));
    
                //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Partition、Key、Value
                producer.send(new ProducerRecord<String, String>("bigdata01", 0,i+"", "itcast"+i));
            }
            producer.close();
        }
    }
    

  • 小结

    • 掌握具体的类和方法?

      • Properties:配置类

      • KafkaProducer:生产者类

        • send(数据对象):负责生产数据到Kafka中

      • ProducerRecord:数据类对象

        • Topic、Key、Value:类似于Hash取余

        • Topic、Value:将所有数据写入了某一个分区

        • Topic、Partition、Key、Value:将所有数据写入指定的分区

消费者API:消费Topic数据

  • 实施

    package bigdata.itcast.cn.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerClient {
        public static void main(String[] args) {
            //todo:1 构建Kafka消费者连接对象
            Properties props =new Properties();
            //指定服务端地址
            props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            //指定当前消费者属于哪个组
            props.setProperty("group.id", "test01");
            //开启自动提交
            props.setProperty("enable.auto.commit", "true");
            //自动提交的时间间隔
            props.setProperty("auto.commit.interval.ms", "1000");
            //读取数据对KV进行反序列化
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //构建消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            //todo:2-消费数据
            //先订阅Topic
            consumer.subscribe(Arrays.asList("bigdata01"));
            //再消费Topic
            while (true) {
                //step1:拉取订阅的Topic中的数据,100ms是一个超时时间
                //ConsumerRecords:存储的是本次拉取的所有数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                //处理数据
                //ConsumerRecord:消费到的每一条数据对象
                for (ConsumerRecord<String, String> record : records){
                    //Topic
                    String topic = record.topic();
                    //part
                    int part = record.partition();
                    //offset
                    long offset = record.offset();
                    //Key And Value
                    String key = record.key();
                    String value = record.value();
                    //模拟处理:输出
                    System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
                }
            }
        }
    }
    
  • 小结

    • 掌握具体的类和方法?

      • Properties:配置对象

      • KafkaConsumer:消费者对象

        • subscribe:订阅Topic

        • poll:拉取Topic数据

      • ConsumerRecords:拉取到消费所有数据的集合

      • ConsumerRecord:每一条消费到的数据对象

        • topic、partition、offset、key、value

三丶Kafka的规则与机制

生产分区规则

  • 实施

    • 面试题:Kafka生产者怎么实现生产数据的负载均衡?

      • 生产数据的时候尽量保证相对均衡的分到Topic多个分区中

    • 问题:为什么生产数据的方式不同,分区的规则就不一样?

      - ProducerRecord(Topic,Value)//将所有数据写入某一个分区
      - ProducerRecord(Topic,Key,Value) //按照Key的Hash取余方式
      - ProducerRecord(Topic,Partition,Key,Value) //指定写入某个分区
      
    • 规则

      • 如果指定了分区:写入所指定的分区中

      • 如果没指定分区:默认调用的是DefaultPartitioner分区器中partition这个方法

        • 如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区

        • 如果没有指定Key:按照黏性分区

          • 2.4之前:轮询分区

            • 优点:数据分配相对均衡

              Topic       part        key     value
              topic       0           1       itcast1
              topic       1           2       itcast2
              topic       2           3       itcast3
              topic       0           4       itcast4
              topic       1           5       itcast5
              topic       2           6       itcast6
              topic       0           7       itcast7
              topic       1           8       itcast8
              topic       2           9       itcast9
            • 缺点:性能非常差

              • Kafka生产者写入数据

                • step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka

                  • 批次满了【batch.size】

                  • 达到一定时间【linger.ms】

                • step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据

                  • 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条

                  • 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条

                  • ……

                  • 每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据

              • 批次多,每个批次数据量少,性能比较差

              • 希望:批次少,每个批次数据量多,性能比较好

          • 2.4之后:黏性分区

            • 设计:实现少批次多数据

            • 规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存

          • 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中

            bigdata01   1   37  null    itcast0
            bigdata01   1   38  null    itcast1
            bigdata01   1   39  null    itcast2
            bigdata01   1   40  null    itcast3
            bigdata01   1   41  null    itcast4
            bigdata01   1   42  null    itcast5
            bigdata01   1   43  null    itcast6
            bigdata01   1   44  null    itcast7
            bigdata01   1   45  null    itcast8
            bigdata01   1   46  null    itcast9
          • 第二次开始根据缓存中是否有上一次的编号

            • 有:直接使用上一次的编号

            • 没有:重新随机选择一个

  • 小结

    • Kafka中生产数据的分区规则是什么?

      • step1:先判断是否指定了分区

      • 如果指定了,就写入指定的分区

      • step2:再判断是否指定了Key

        • 如果指定了Key,按照Key的mur取余分区个数来决定

        • 如果没有指定Key,按照黏性分区

自定义开发生产分区器

  • 实施

    • 开发一个随机分区器

      package bigdata.itcast.cn.kafka.userpart;
      ​
      import org.apache.kafka.clients.producer.Partitioner;
      import org.apache.kafka.common.Cluster;
      ​
      import java.util.Map;
      import java.util.Random;
      ​
      /**
       * @ClassName UserPartitioner
       * @Description TODO 用户自定义的分区器,实现用户自定义随机分区
       * @Date 2021/9/22 15:36
       * @Create By     Frank
       */
      public class UserPartitioner implements Partitioner {
      ​
          /**
           * 真正计算返回分区的方法
           * @return
           */
          @Override
          public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
              //先获取分区个数
              Integer count = cluster.partitionCountForTopic(topic);
              //构建一个随机值
              Random random = new Random();
              //随机生成一个分区编号
              int i = random.nextInt(count);
              return i;
          }
      ​
          @Override
          public void close() {
              //释放资源方法
          }
      ​
          @Override
          public void configure(Map<String, ?> configs) {
              //获取配置方法
          }
      }
      ​
    • 加载分区器

      //指定分区器
      props.put("partitioner.class","bigdata.itcast.cn.kafka.userpart.UserPartition");
    • 小结

    • 如何构建一个自定义分区器?

      • step2:生产者代码指定自定义分区器即可

      • step1:自己开发分区器

        • 开发一个类:实现Partitioner接口

        • 实现方法:partition

消费分配策略

  • 基本规则
     一个分区只能被一个消费者组的一个消费者消费
    一个消费者可以消费多个分区
    最多存在和分区数相同数量的消费者
  • 分配策略
  • RangeAssignor:范围分配,默认的分配策略,如果能均分,则平均分配,如果不能均分,默认将多出来的分区分配给编号小的消费者
    优点:当Topic或分区个数比较少时,分配相对比较均衡
    缺点:如果Topic或分区个数比较多时,若不能均分,会导致负载不均衡的问题,一般不用
  • RoundRobinAssignor:轮询分配,常见于Kafka2.0之前的版本,按照Topic名称和分区编号排序,轮询分配每个消费者org.apache.kafka.clients.consumer.RoundRobinAssignor
    优点:如果有多个消费者,消费的Topic都是一样的,实现将所有Topic的所有分区轮询分配给所有消费者,尽量实现负载的均衡
    缺点:如果遇到消费者Topic订阅是不一致的,只能基于订阅的消费者进行轮询,导致整体消费者负载不均衡
     

  • StickyAssignor:黏性分配,2.0之后建议使用,类似于粘性分配,尽量将分区均衡的分配给消费者,底层是通过算法实现的,
    特点:相对的保证分配的均衡
               如果某一个消费者故障,尽量的避免网络传输

  • 尽量保证原来的消费的分区不变,将多出来分区均衡给剩余的消费者

    • 假设原来有3个消费,消费6个分区,平均每个消费者消费2个分区

  • 如果有一个消费者故障了,这个消费者负责的分区交给剩下的消费者来做:消费重平衡
    优点:分配更加均衡,如果消费者出现故障,提高性能,避免重新分配,将多余的分区均衡的分配给剩余的消费者

  • org.apache.kafka.clients.consumer.StickyAssignor
    

kafka的存储机制:存储结构

  • 分类
  1.  Broker:物理存储节点,用于存储Kafka中每个分区的数据
  2. Producer:生产者生产数据
  3. Topic:逻辑存储对象,用于区分不同数据的存储
  4. Partition:分布式存储单元,一个Topic可以划分多个分区,每个分区可以分布式存储在不同的Broker节点上
  5. Segment:分区端,每个分区的数据存储在一个或多个Segment中,每个Segment有一对文件组成
    .log
    .index
    segment命名规则:最小Offset
  • 小结
    kafka的存储结构:
    Producer->Broker->Consumer

kafka的存储机制:写入过程

  • 问:kafka是如何写入的,为什么写入这么快
  • 实施:
  1. 生产者生产每一条数据,将数据放入一个batch批次中,如果batch满了或达到一定时间,提交写入请求
  2. 生产者根据分区规则构建数据分区,获取对应的元数据,将请求提交给leader副本所在的Broker

    一个Topic分区会有多个副本,值写入leader副本
    从元数据中获取当前这个分区对应的leader副本的位置,提交写入请求
    kafka元数据存储在ZK中
  3. 先写入这台Broker的PageCache(操作系统级别内存)中,Kafka也用了内存机制来实现数据的快速读写
    ​​​
    Redis:进程级别内存,数据会随着进程的关闭而释放
    Kafka:操作系统Page Cache
                  选用了操作系统自带的缓存区域:PageCache
                  由操作系统来管理所有内存,即使Kafka Broker故障,数据依旧存在PageCache中
  4. 操作系统的后台自动将页缓存中的数据SYNC同步到磁盘文件中:最新的Segment的.log中
    kafka通过操作系统内存刷新调用机制来实现:内存存储容量+时间
    顺序写磁盘:不断将每一条数据追加到.log文件中
  5. 其他的Follower到Leader中同步数据
  • 小结
    kafka是如何写入数据的:生产者生产数据,提交给kafka集群
    生产者先计算每条数据属于哪个分区
    • 会获取元数据,从元数据中获取这个分区的leader副本所在的broker的地址

    • 将请求提交给这个Broker

    • 先写PageCache:内存区域

    • 满足条件以后:将PageCache中的数据顺序写入磁盘中的文件

  • 写入很快?

    • PageCache:写内存

    • 顺序写:写磁盘

Kafka存储机制:读取过程

  • 实施

    • step1:消费者根据Topic、Partition、Offset提交给Kafka请求读取数据

    • step2:Kafka根据元数据信息,找到对应的这个分区对应的Leader副本节点

    • step3:请求Leader副本所在的Broker,先读PageCache,通过零拷贝机制【Zero Copy】读取PageCache

      • 实现0磁盘读写

      • 直接将内存数据发送到网络端口,实现传输

    • step4:如果PageCache中没有,读取Segment文件段,先根据offset找到要读取的那个Segment

      • 先根据offset和segment文件段名称定位这个offset在哪个segment文件段中

    • step5:将.log文件对应的.index文件加载到内存中,根据.index中索引的信息找到Offset在.log文件中的最近位置

      • 最近位置:index中记录的稀疏索引【不是每一条数据都有索引】

    • step6:读取.log,根据索引读取对应Offset的数据

  • 小结

    • Kafka数据是如何被读取的?

      • step1:先读PageCache,如果有,通过Zero Copy【sendFile】机制来实现

      • step2:如果没有读这个分区的Segment

        • 先根据offset确定读取哪个Segment

        • 先index,再读.log

    • Kafka为什么写入很快?

      • PageCahce + 顺序写

    • Kafka为什么读取和快?

      • PageCache + 零拷贝

      • index索引机制 + offset

    • 终于知道Kafka为什么这么快了! - 知乎

Kafka存储机制:index索引设计

  • 实施

    • 索引类型

      • 全量索引:每一条数据,都对应一条索引

        • index:201条

          0           0
          1           101
          2           202
          
        • .log:201条数据

          0           key1            value1
          1           key2            value2
          ……
          200         key201          value201
          
      • 稀疏索引:部分数据有索引,有一部分数据是没有索引的

        • index:10条

          0           0
          2           202
          9           1010
          ……
        • log:201条

          0           key1            value1
          1           key2            value2
          ……
          200         key201          value201
        • 优点:减少了索引存储的数据量加快索引的索引的检索效率

        • 什么时候生成一条索引?

          #.log文件每增加4096字节,在.index中增加一条索引
          log.index.interval.bytes=4096
        • Kafka中选择使用了稀疏索引

    • 索引内容

      • 两列

        • 第一列:这条数据在这个文件中的位置

        • 第二列:这条数据在文件中的物理偏移量

          是这个文件中的第几条,数据在这个文件中的物理位置
          1,0             --表示这个文件中的第一条,在文件中的位置是第0个字节开始
          3,497           --表示这个文件中的第三条,在文件中的位置是第497个字节开始
          
      • 这个文件中的第1条数据是这个分区中的第368770条数据,offset = 368769

    • 检索数据流程

      • step1:先根据offset计算这条offset是这个文件中的第几条

      • step2:读取.index索引,根据二分检索,从索引中找到离这条数据最近偏小的位置

      • step3:读取.log文件从最近位置读取到要查找的数据

    • 举例

      • 需求:查找offset = 368772

      • step1:计算是文件中的第几条

        368772 - 368769 = 3 + 1 = 4,是这个文件中的第四条数据
      • step2:读取.index索引,找到最近位置

        3,497
      • step3:读取.log,从497位置向后读取一条数据,得到offset = 368772的数据

    • 问题:为什么不直接将offset作为索引的第一列?

      • 用offset会导致index文件很大,而且比较费时

  • 小结

    • .index文件中的索引的内容是什么?

    • 第一列:这条数据在文件中的是第几条数据

    • 第二列:这条数据在文件中的物理偏移量

    • 查询数据时如何根据索引找到对应offset的数据?

      • step1:先根据offset定位Segment文件

      • step2:读取.index文件,找到最近的位置

      • 先计算你要的offset是这个文件第几条

        • offset - 文件最小offset + 1

        • 从index找到最近位置返回

      • step3:从最近位置开始向后读取

Kafka数据清理规则

  • 实施

    • 属性配置

      #开启清理
      log.cleaner.enable = true
      #清理规则
      log.cleanup.policy = delete | compact
    • 清理规则:delete

      • 基于存活时间规则:最常用的方式

        log.retention.ms
        log.retention.minutes
        log.retention.hours=168/7天
      • 基于文件大小规则

        #删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则
        log.retention.bytes = -1
      • 基于offset消费规则

        • 功能:将明确已经消费的offset的数据删除

        • 如何判断已经消费到什么位置

          • step1:编写一个文件offset.json

            {
              "partitions":[
                 {"topic": "bigdata", "partition": 0,"offset": 2000},
                 {"topic": "bigdata", "partition": 1,"offset": 2000}
               ],
               "version":1
            }
          • step2:指定标记这个位置

            kafka-delete-records.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --offset-json-file offset.json 
    • 清理规则:compact

      • 功能:也称为压缩,将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复

  • 小结

    • Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?

      • delete方案:根据时间定期的清理过期的Segment文件

Kafka分区副本概念:AR、ISR、OSR

  • 实施

    • 分区副本机制:每个kafka中分区都可以构建多个副本,相同分区的副本存储在不同的节点上

      • 为了保证安全和写的性能:划分了副本角色

      • leader副本:对外提供读写数据

      • follower副本:与Leader同步数据,如果leader故障,选举一个新的leader

      • 选举实现:Kafka主节点Controller实现

    • AR:All - Replicas

      • 所有副本:指的是一个分区在所有节点上的副本

        Partition: 0   Replicas: 1,0 
    • ISR:In - Sync - Replicas

      • 可用副本:所有正在与Leader同步的Follower副本

        Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
      • 列表中:按照优先级排列【Controller根据副本同步状态以及Broker健康状态】,越靠前越会成为leader

      • 如果leader故障,是从ISR列表中选择新的leader

      • 如果生产者写入数据:ack=all,写入所有ISR列表中的副本,就返回ack

    • OSR:Out - Sync - Replicas

      • 不可用副本:与Leader副本的同步差距很大,成为一个OSR列表的不可用副本

      • 原因:网路故障等外部环境因素,某个副本与Leader副本的数据差异性很大

      • 判断是否是一个OSR副本?

        • 0.9之前:时间和数据差异

          replica.lag.time.max.ms = 10000   可用副本的同步超时时间
          replica.lag.max.messages = 4000   可用副本的同步记录差,该参数在0.9以后被删除
        • 0.9以后:只按照时间来判断

          replica.lag.time.max.ms = 10000   可用副本的同步超时时间
      • 写入、Leader选举:都只从ISR列表中选取

  • 小结

    • Kafka中的分区数据如何保证数据安全?

      • 分区副本机制

    • 什么是AR、ISR、OSR?

      • AR:所有副本

      • ISR:可用副本

      • OSR:不可用副本

      • AR = ISR + OSR

Kafka数据同步概念:HW丶LEO 

  • 实施

    • 什么是HW、LEO?

      • 分区:3个副本

      • Leader:0 1 2 3 4 5 

        6 7 8

        • LEO:9

      • Follower1: 0 1 2 3 4 5

        • LEO:6

      • Follower2:0 1 2 3 4 5 6

        • LEO = 7

      • HW = 6

        • LW:low_watermark:最低消费的offset

        • HW:high_watermark:最高消费的offset

      • LEO:Log End Offset

      • LSO:Log StartOffset

      • HW:当前这个分区所有副本同步的最低位置+1,消费者能消费到的最大位置

      • LEO:当前每个副本已经写入数据的最新位置 + 1

        • 副本最小的LEO = HW

    • 数据写入Leader及同步过程

      • step1:数据写入分区的Leader副本

        • leader:LEO = 5

        • follower1:LEO = 3

        • follower2:LEO = 3

      • step2:Follower到Leader副本中同步数据

        • leader:LEO = 5

        • follower1:LEO = 5

        • follower2:LEO = 4

  • 小结

    • HW:所有副本都同步的位置+1,消费者可以消费到的位置

    • LEO:leader当前最新的位置+1

消息队列的一次性语义

  • 实施
  1. at-most-once:至多一次
  2. at-least-once:至少一次
  3. exactly-once:有且仅有一次
  • 小结
    kafka从理论上可以实现Exactly Once
    大多数的消息队列一般不能满足ExactlyOnce就满足at-least-once

Kafka保证消费一次性语义

  • 实施
    规则:消费者是根据offset来持续消费,只要保证任何场景下消费者都能知道上一次offset即可
    实现:将offset存储在一种可靠外部存储中华,手动管理offset
    第一步:第一次消费根据属性进行消费
    第二步:消费分区数据,处理分区数据
    第三步:处理成功,将处理成功的分区offset进行额外的存储。kafka默认存储_consumer_offsets,外部存储有Mysql,Redis,Zookeeper
    第四步:如果消费者故障,可以从外部存储读取上一次消费者消费的offset向kafka进行请求
  • 小结
    如何实现一次性语义?

    原则:必须按照offset顺序消费
    当消费并处理成功以后,自己保存offset
    默认方案:手动将offset提交到__consumer_offsets这个Topic中
    手动管理:自己进行额外存储,以后重启提交,读取外部系统中记录的offset,向Kafka提交

Kafka集群常用配置

  • 实施

    • 集群配置:server.properties

      属性 含义
      broker.id int类型 Kafka服务端的唯一id,用于注册zookeeper,一般一台机器一个
      host.name hostname 绑定该broker对应的机器地址
      port 端口 Kafka服务端端口:9092
      log.dirs 目录 kafka存放数据的路径
      zookeeper.connect hostname:2181/kafkadata zookeeper的地址
      zookeeper.session.timeout.ms 6000 zookeeper会话超时时间
      zookeeper.connection.timeout.ms 6000 zookeeper客户端连接超时时间
      num.partitions 1 分区的个数
      default.replication.factor 1 分区的副本数
      log.segment.bytes 1073741824 单个log文件的大小,默认1G生成一个
      log.index.interval.bytes 4096 log文件每隔多大生成一条index
      log.roll.hours 168 单个log文件生成的时间规则,默认7天一个log
      log.cleaner.enable true 开启日志清理
      log.cleanup.policy delete,compact 默认为delete,删除过期数据,compact为合并数据
      log.retention.minutes 分钟值 segment生成多少分钟后删除
      log.retention.hours 小时值 segment生成多少小时后删除【168】,7天
      log.retention.ms 毫秒值 segment生成多少毫秒后删除
      log.retention.bytes -1 删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,直到小于阈值
      log.retention.check.interval.ms 毫秒值【5分钟】 多长时间检查一次是否有数据要标记删除
      log.cleaner.delete.retention.ms 毫秒值 segment标记删除后多长时间删除
      log.cleaner.backoff.ms 毫秒值 多长时间检查一次是否有数据要删除
      log.flush.interval.messages Long.MaxValue 消息的条数达到阈值,将触发flush缓存到磁盘
      log.flush.interval.ms Long.MaxValue 隔多长时间将缓存数据写入磁盘
      auto.create.topics.enable false 是否允许自动创建topic,不建议开启
      delete.topic.enable true 允许删除topic
      replica.lag.time.max.ms 10000 可用副本的同步超时时间
      replica.lag.max.messages 4000 可用副本的同步记录差,该参数在0.9以后被删除
      unclean.leader.election.enable true 允许不在ISR中的副本成为leader
      num.network.threads 3 接受客户端请求的线程数
      num.io.threads 8 处理读写硬盘的IO的线程数
      background.threads 4 后台处理的线程数,例如清理文件等
    • 生产配置:producer.properties

      属性 含义
      bootstrap.servers hostname:9092 KafkaServer端地址
      poducer.type sync | async 同步或者异步写入磁盘
      min.insync.replicas 3 最小ISR个数
      buffer.memory 33554432 配置生产者本地发送数据的缓存大小
      compression.type none 配置数据压缩,可配置snappy
      partitioner.class Partition 指定分区的类
      acks 1 指定写入数据的保障方式
      request.timeout.ms 10000 等待ack确认的时间,超时发送失败
      retries 3 发送失败的重试次数
      batch.size 16384 批量发送的大小
      long.ms 5000 发送间隔时间
      metadata.max.age.ms 300000 更新缓存的元数据【topic、分区leader等】

    • 消费配置:consumer.properties

      属性 含义
      bootstrap.servers hostname:9092 指定Kafka的server地址
      group.id id 消费者组的 名称
      consumer.id 自动分配 消费者id
      auto.offset.reset latest 新的消费者从哪里读取数据latest,earliest
      auto.commit.enable true 是否自动commit当前的offset
      auto.commit.interval.ms 1000 自动提交的时间间隔
  • 小结

    • 常用属性了解即可

可视化工具Kafka Eagle部署及使用

  • 实施

    • Kafka Eagle的功能

      • 用于集成Kafka,实现Kafka集群可视化以及监控报表平台

    • Kafka Eagle的部署启动

      • 下载解压:以第三台机器为例

        cd /export/software/
        rz
        tar -zxvf kafka-eagle-bin-1.4.6.tar.gz -C /export/server/
        cd /export/server/kafka-eagle-bin-1.4.6/
        tar -zxf kafka-eagle-web-1.4.6-bin.tar.gz 
        
      • 修改配置

        • 准备数据库:存储eagle的元数据,在Mysql中创建一个数据库

          create database eagle;
        • 修改配置文件:

          cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/
          vim  conf/system-config.properties
          #配置zookeeper集群的名称
          kafka.eagle.zk.cluster.alias=cluster1
          #配置zookeeper集群的地址
          cluster1.zk.list=node1:2181,node2:2181,node3:2181
          #31行左右配置开启统计指标
          kafka.eagle.metrics.charts=true
          #配置连接MySQL的参数,并注释自带的sqlite数据库
          kafka.eagle.driver=com.mysql.jdbc.Driver
          kafka.eagle.url=jdbc:mysql://node1:3306/eagle
          kafka.eagle.username=root
          kafka.eagle.password=hadoop
      • 配置环境变量

        vim /etc/profile
        ​
        #KE_HOME
        export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
        export PATH=$PATH:$KE_HOME/bin
        ​
        source /etc/profile
      • 添加执行权限

        cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
        chmod u+x bin/ke.sh
      • 启动服务

        ke.sh start
      • 登陆

        网页:node3:8048/ke
        用户名:admin
        密码:123456
    • 常见问题:登录时可能会报405错误,网上说很可能是版本问题,可把我坑惨了,我换了三次版本,1.4.6,1.4.8,2.1.0,但实际上并不是版本问题。重要的少一点是sql数据库里面要有数据表并且有数据,回头想一下也确实,不然咋匹配呢。下面是我在网上找到脚本文件:
       
      CREATE TABLE IF	NOT EXISTS `ke_alarm_crontab` (
      		`id` BIGINT ( 20 ) NOT NULL,
      		`type` VARCHAR ( 64 ) NOT NULL,
      		`crontab` VARCHAR ( 32 ) DEFAULT NULL,
      		`is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`modify` VARCHAR ( 32 ) DEFAULT NULL,
      		PRIMARY KEY ( `id`, `type` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_metrics` (
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`broker` TEXT DEFAULT NULL,
      		`type` VARCHAR ( 32 ) DEFAULT NULL,
      		`key` VARCHAR ( 64 ) DEFAULT NULL,
      		`value` VARCHAR ( 128 ) DEFAULT NULL,
      		`timespan` BIGINT ( 20 ) DEFAULT NULL,
      		`tm` VARCHAR ( 16 ) DEFAULT NULL 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_p_role` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`name` VARCHAR ( 64 ) CHARACTER 
      		SET utf8 NOT NULL COMMENT 'role name',
      		`seq` TINYINT ( 4 ) NOT NULL COMMENT 'rank',
      		`description` VARCHAR ( 128 ) CHARACTER 
      		SET utf8 NOT NULL COMMENT 'role describe',
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB AUTO_INCREMENT = 4 DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_alarm_clusters` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`type` VARCHAR ( 32 ) DEFAULT NULL,
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`server` TEXT DEFAULT NULL,
      		`alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
      		`alarm_times` INT ( 11 ),
      		`alarm_max_times` INT ( 11 ),
      		`alarm_level` VARCHAR ( 4 ),
      		`is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
      		`is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`modify` VARCHAR ( 32 ) DEFAULT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_alarm_consumer` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`group` VARCHAR ( 128 ) DEFAULT NULL,
      		`topic` VARCHAR ( 128 ) DEFAULT NULL,
      		`lag` BIGINT ( 20 ) DEFAULT NULL,
      		`alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
      		`alarm_times` INT ( 11 ),
      		`alarm_max_times` INT ( 11 ),
      		`alarm_level` VARCHAR ( 4 ),
      		`is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
      		`is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`modify` VARCHAR ( 32 ) DEFAULT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_connect_config` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`cluster` VARCHAR ( 64 ),
      		`connect_uri` VARCHAR ( 128 ),
      		`version` VARCHAR ( 32 ),
      		`alive` VARCHAR ( 16 ),
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`modify` VARCHAR ( 32 ) DEFAULT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_resources` (
      		`resource_id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`name` VARCHAR ( 255 ) CHARACTER 
      		SET utf8 NOT NULL COMMENT 'resource name',
      		`url` VARCHAR ( 255 ) NOT NULL,
      		`parent_id` INT ( 11 ) NOT NULL,
      		PRIMARY KEY ( `resource_id` ) 
      	) ENGINE = INNODB AUTO_INCREMENT = 17 DEFAULT CHARSET = utf8;
       
      CREATE TABLE IF	NOT EXISTS `ke_metrics_offline` (
      		`cluster` VARCHAR ( 64 ) NOT NULL,
      		`key` VARCHAR ( 128 ) NOT NULL,
      		`one` VARCHAR ( 128 ) DEFAULT NULL,
      		`mean` VARCHAR ( 128 ) DEFAULT NULL,
      		`five` VARCHAR ( 128 ) DEFAULT NULL,
      		`fifteen` VARCHAR ( 128 ) DEFAULT NULL,
      		PRIMARY KEY ( `cluster`, `key` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
       
       
      CREATE TABLE IF	NOT EXISTS `ke_logsize` (
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`topic` VARCHAR ( 64 ) DEFAULT NULL,
      		`logsize` BIGINT ( 20 ) DEFAULT NULL,
      		`diffval` BIGINT ( 20 ) DEFAULT NULL,
      		`timespan` BIGINT ( 20 ) DEFAULT NULL,
      		`tm` VARCHAR ( 16 ) DEFAULT NULL 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_consumer_group_summary` (
      		`cluster` VARCHAR ( 64 ) NOT NULL,
      		`group` VARCHAR ( 128 ) NOT NULL,
      		`topic_number` VARCHAR ( 128 ) NOT NULL,
      		`coordinator` VARCHAR ( 128 ) DEFAULT NULL,
      		`active_topic` INT ( 11 ) DEFAULT NULL,
      		`active_thread_total` INT ( 11 ) DEFAULT NULL,
      		PRIMARY KEY ( `cluster`, `group` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_consumer_bscreen` (
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`group` VARCHAR ( 128 ) DEFAULT NULL,
      		`topic` VARCHAR ( 64 ) DEFAULT NULL,
      		`logsize` BIGINT ( 20 ) DEFAULT NULL,
      		`difflogsize` BIGINT ( 20 ) DEFAULT NULL,
      		`offsets` BIGINT ( 20 ) DEFAULT NULL,
      		`diffoffsets` BIGINT ( 20 ) DEFAULT NULL,
      		`lag` BIGINT ( 20 ) DEFAULT NULL,
      		`timespan` BIGINT ( 20 ) DEFAULT NULL,
      		`tm` VARCHAR ( 16 ) DEFAULT NULL 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_users` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`rtxno` INT ( 11 ) NOT NULL,
      		`username` VARCHAR ( 64 ) NOT NULL,
      		`password` VARCHAR ( 128 ) NOT NULL,
      		`email` VARCHAR ( 64 ) NOT NULL,
      		`realname` VARCHAR ( 128 ) NOT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
      
      CREATE TABLE IF	NOT EXISTS `ke_sql_history` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`cluster` VARCHAR ( 64 ) DEFAULT NULL,
      		`username` VARCHAR ( 64 ) DEFAULT NULL,
      		`host` VARCHAR ( 128 ) DEFAULT NULL,
      		`ksql` TEXT DEFAULT NULL,
      		`status` VARCHAR ( 16 ) DEFAULT NULL,
      		`spend_time` BIGINT ( 20 ) DEFAULT NULL,
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`tm` VARCHAR ( 16 ) DEFAULT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_consumer_group` (
      		`cluster` VARCHAR ( 64 ) NOT NULL,
      		`group` VARCHAR ( 128 ) NOT NULL,
      		`topic` VARCHAR ( 128 ) NOT NULL,
      		`status` INT ( 11 ) DEFAULT NULL,
      		PRIMARY KEY ( `cluster`, `group`, `topic` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF 	NOT EXISTS `ke_alarm_config` (
      		`cluster` VARCHAR ( 64 ) NOT NULL,
      		`alarm_group` VARCHAR ( 128 ) NOT NULL,
      		`alarm_type` VARCHAR ( 16 ) DEFAULT NULL,
      		`alarm_url` TEXT DEFAULT NULL,
      		`http_method` VARCHAR ( 16 ) DEFAULT NULL,
      		`alarm_address` TEXT DEFAULT NULL,
      		`created` VARCHAR ( 32 ) DEFAULT NULL,
      		`modify` VARCHAR ( 32 ) DEFAULT NULL,
      		PRIMARY KEY ( `cluster`, `alarm_group` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
       
      CREATE TABLE IF	NOT EXISTS `ke_user_role` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`user_id` INT ( 11 ) NOT NULL,
      		`role_id` TINYINT ( 4 ) NOT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
      	
      CREATE TABLE IF	NOT EXISTS `ke_role_resource` (
      		`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
      		`role_id` INT ( 11 ) NOT NULL,
      		`resource_id` INT ( 11 ) NOT NULL,
      		PRIMARY KEY ( `id` ) 
      	) ENGINE = INNODB AUTO_INCREMENT = 19 DEFAULT CHARSET = utf8;
      	
      ALTER TABLE `ke_consumer_bscreen` ADD INDEX `idx_timespan` ( `timespan` );
      ALTER TABLE `ke_logsize` ADD INDEX `idx_timespan` ( `timespan` );
      ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_topic` ( `tm`, `topic` );
      ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_cluster_diffval` ( `tm`, `cluster`, `diffval` );
      ALTER TABLE `ke_consumer_bscreen` ADD INDEX`eagle` `idx_tm_cluster_diffoffsets` ( `tm`, `cluster`, `diffoffsets` );
      INSERT INTO `ke_users`
      VALUES
      	( '1', '1000', 'admin', '123456', 'admin@email.com', 'Administrator' );;
      CREATE TABLE IF	NOT EXISTS `ke_topic_rank` (
      		`cluster` VARCHAR ( 64 ) NOT NULL,
      		`topic` VARCHAR ( 64 ) NOT NULL,
      		`tkey` VARCHAR ( 64 ) NOT NULL,
      		`tvalue` BIGINT ( 20 ) DEFAULT NULL,
      		PRIMARY KEY ( `cluster`, `topic`, `tkey` ) 
      	) ENGINE = INNODB DEFAULT CHARSET = utf8;
      INSERT INTO `ke_user_role`
      VALUES
      	( '1', '1', '1' );;
       
      INSERT INTO `ke_p_role`
      VALUES
      	( '1', 'Administrator', '1', 'Have all permissions' ),
      	( '2', 'Devs', '2', 'Own add or delete' ),
      	( '3', 'Tourist', '3', 'Only viewer' );
      	
      INSERT INTO `ke_role_resource`
      VALUES
      	( '1', '1', '1' ),
      	( '2', '1', '2' ),
      	( '3', '1', '3' ),
      	( '4', '1', '4' ),
      	( '5', '1', '5' ),
      	( '6', '1', '7' ),
      	( '7', '1', '8' ),
      	( '8', '1', '10' ),
      	( '9', '1', '11' ),
      	( '10', '1', '13' ),
      	( '11', '2', '7' ),
      	( '12', '2', '8' ),
      	( '13', '2', '13' ),
      	( '14', '2', '10' ),
      	( '15', '2', '11' ),
      	( '16', '1', '14' ),
      	( '17', '1', '15' ),
      	( '18', '1', '16' ),
      	( '19', '1', '18' ),
      	( '20', '1', '19' ),
      	( '21', '1', '20' ),
      	( '22', '1', '21' ),
      	( '23', '1', '22' ),
      	( '24', '1', '23' ),
      	( '25', '1', '24' );
      INSERT INTO `ke_resources`VALUES
      	( '1', 'System', '/system', '-1' ),
      	( '2', 'User', '/system/user', '1' ),
      	( '3', 'Role', '/system/role', '1' ),
      	( '4', 'Resource', '/system/resource', '1' ),
      	( '5', 'Notice', '/system/notice', '1' ),
      	( '6', 'Topic', '/topic', '-1' ),
      	( '7', 'Message', '/topic/message', '6' ),
      	( '8', 'Create', '/topic/create', '6' ),
      	( '9', 'Alarm', '/alarm', '-1' ),
      	( '10', 'Add', '/alarm/add', '9' ),
      	( '11', 'Modify', '/alarm/modify', '9' ),
      	( '12', 'Cluster', '/cluster', '-1' ),
      	( '13', 'ZkCli', '/cluster/zkcli', '12' ),
      	( '14', 'UserDelete', '/system/user/delete', '1' ),
      	( '15', 'UserModify', '/system/user/modify', '1' ),
      	( '16', 'Mock', '/topic/mock', '6' ),
      	( '18', 'Create', '/alarm/create', '9' ),
      	( '19', 'History', '/alarm/history', '9' ),
      	( '20', 'Manager', '/topic/manager', '6' ),
      	( '21', 'PasswdReset', '/system/user/reset', '1' ),
      	( '22', 'Config', '/alarm/config', '9' ),
      	( '23', 'List', '/alarm/list', '9' ),
      	( '24', 'Hub', '/topic/hub', '6' );
      

    • Kafka Eagle使用

      • 监控Kafka集群

      • 监控Zookeeper集群

      • 监控Topic

      • 查看数据积压

        • 现象:消费跟不上生产速度,导致处理的延迟

        • 原因

          • 消费者组的并发能力不够

          • 消费者处理失败

          • 网络故障,导致数据传输较慢

        • 解决

          • 提高消费者组中消费者的并行度

          • 分析处理失败的原因

          • 找到网络故障的原因

          • 查看监控

      • 报表

  • 小结

    • Kafka中最常用的监控工具

    • 用于查看集群信息、管理集群、监控集群

相关推荐

  1. 数据kafka应用

    2024-03-15 17:44:03       37 阅读
  2. [AIGC 数据基础] 数据流处理 Kafka

    2024-03-15 17:44:03       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-15 17:44:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-15 17:44:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-15 17:44:03       82 阅读
  4. Python语言-面向对象

    2024-03-15 17:44:03       91 阅读

热门阅读

  1. 服务器通常会遭到哪些攻击手段?

    2024-03-15 17:44:03       42 阅读
  2. 无人机的航向角

    2024-03-15 17:44:03       41 阅读
  3. 计算机网络规划与设计 -- 设计基础

    2024-03-15 17:44:03       47 阅读
  4. 研究生预答辩全解析

    2024-03-15 17:44:03       46 阅读
  5. JVM 面试题——CMS和G1的区别

    2024-03-15 17:44:03       41 阅读
  6. 数字电子技术实验(二)

    2024-03-15 17:44:03       35 阅读
  7. C++ STL入门:解锁现代C++编程的强大工具箱

    2024-03-15 17:44:03       43 阅读
  8. Oracle Rac集群日常维护管理命令

    2024-03-15 17:44:03       42 阅读
  9. Rabbitmq---topics模型之动态路由

    2024-03-15 17:44:03       41 阅读
  10. IUV-5G全网仿真软件实训手册-手工整理

    2024-03-15 17:44:03       47 阅读