Cloudera Manager 安装 Kafka 并简单使用

Kafka 简介

kafka 是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

这里写图片描述

核心概念:

  • Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition
    Partition是物理上的概念,每个Topic包含一个或多个Partition.

  • Producer
    负责发布消息到Kafka broker

  • Consumer
    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

  • 消息的发布(publish)称作producer,消息的订阅(subscribe)称作consumer,中间的存储阵列称作broker。

  • 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。

  • producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。

  • broker端不维护数据的消费状态,提升了性能。

  • 直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

  • Kafka使用scala编写,可以运行在JVM上。

Kafaka 安装

可以按照 CHD5.11 离线安装或者升级 Spark2.x 详细步骤 教程进行安装。

角色分配:

这里写图片描述

安装出现如下问题及解决方案:

WARN Log Found a corrupted index file, /var/local/kafka/data/__consumer_offsets-33/00000000000000000000.index, deleting and rebuilding index...


上午10点10:04.842分	FATAL	KafkaServer	
Fatal error during KafkaServer startup. Prepare to shutdown
kafka.common.InconsistentBrokerIdException: Configured broker.id 118 doesn't match stored broker.id 145 in meta.properties. If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).
	at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:648)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:187)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
	at kafka.Kafka$.main(Kafka.scala:67)
	at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
	at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
	
	
// 解决方案	
vi /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/server.properties
看到:	
log.dirs=/tmp/kafka-logs

删除 log.dirs 目录

结果发现 /tmp/kafka-logs没有,全部在/tmp目录下,故执行 rm -rf /tmp/kafka*

在删除log 文件, rm -Rf /var/log/kafka*


// 然后重新安装kafka就可以了

Kafka 简单使用

Kafka command-line tools are located in /usr/bin:

kafka-topics

// 创建 topic test
# kafka-topics --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test
Created topic "test".

// 查看 topics
# kafka-topics --zookeeper master:2181 --list
test

// 删除 topic test
kafka-topics --delete --zookeeper master:2181 --topic test

kafka-console-consumer

Read data from a Kafka topic and write it to standard output. For example:

// 在slave1 上启动 consumer
kafka-console-consumer --zookeeper slave1:2181 --topic test

kafka-console-producer

Read data from standard output and write it to a Kafka topic. For example:

# kafka-console-producer --broker-list slave1:9092,slave2:9092 --topic test

然后在屏幕上输入数据,在consumer端可以接收到数据。

相关推荐

  1. kafka安装测试

    2024-02-01 10:20:02       38 阅读
  2. kafka简单使用

    2024-02-01 10:20:02       28 阅读
  3. Springboot整合kafka简单使用

    2024-02-01 10:20:02       32 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-01 10:20:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-01 10:20:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-02-01 10:20:02       82 阅读
  4. Python语言-面向对象

    2024-02-01 10:20:02       91 阅读

热门阅读

  1. cx_oracle连接oracle的us7ascii数据集乱码问题

    2024-02-01 10:20:02       53 阅读
  2. datax oracle->pg库 迁移表

    2024-02-01 10:20:02       60 阅读
  3. npm install -g pnpm 安装出现错误

    2024-02-01 10:20:02       53 阅读
  4. npm发布Vue组件

    2024-02-01 10:20:02       44 阅读
  5. Kubernetes实战(二十)-集群节点磁盘清理

    2024-02-01 10:20:02       51 阅读
  6. STM32—系统定时器

    2024-02-01 10:20:02       57 阅读
  7. Python实现设计模式-策略模式

    2024-02-01 10:20:02       50 阅读