初试Kafka

Kafka 是一个分布式流处理平台,通常用作消息中间件,它可以处理大规模的实时数据流。以下是从零开始使用 Kafka 作为消息中间件的基本教程:

步骤 1: 下载和安装 Kafka

  1. 访问 Apache Kafka 官方网站:Apache Kafka
  2. 下载最新的 Kafka 发行版,并解压缩到本地文件夹。

步骤 2: 启动 ZooKeeper

Kafka 使用 ZooKeeper 来协调分布式节点。在 Kafka 解压缩后的文件夹中,进入 bin 目录,执行以下命令启动 ZooKeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties

步骤 3: 启动 Kafka 服务

继续在 bin 目录中执行以下命令启动 Kafka 服务:

./kafka-server-start.sh ../config/server.properties

步骤 4: 创建一个主题(Topic)

Kafka 使用主题来组织和分类消息。执行以下命令创建一个主题:

./kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

这将创建一个名为 my_topic 的主题,具有一个分区和一个副本。

步骤 5: 发送消息到主题

使用 Kafka 提供的生产者工具向主题发送消息:

./kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092

然后,您可以在控制台中输入消息并按 Enter 发送。

步骤 6: 消费消息

使用 Kafka 提供的消费者工具从主题中消费消息:

./kafka-console-consumer.sh --topic my_topic --bootstrap-server localhost:9092 --from-beginning

这将显示从主题中接收到的消息。

步骤 7: 使用编程语言连接 Kafka

除了命令行工具外,您还可以使用编程语言连接 Kafka。根据您选择的语言,可以使用 Kafka 提供的客户端库。

使用 Java 示例
// 生产者示例
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
   
    public static void main(String[] args) {
   
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "Hello, Kafka!");

        producer.send(record);

        producer.close();
    }
}

// 消费者示例
import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
   
    public static void main(String[] args) {
   
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "my_group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("my_topic"));

        while (true) {
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
   
                System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
            }
        }
    }
}

这是一个简单的 Java 示例,演示了如何使用 Kafka 的生产者和消费者 API。

希望这个简单的教程能帮助您入门 Kafka。请注意,这只是一个基础,Kafka 还有许多高级功能和配置,具体取决于您的使用场景和需求。

相关推荐

  1. 初试Kafka

    2023-12-27 08:00:02       59 阅读
  2. Kafka初步学习

    2023-12-27 08:00:02       29 阅读
  3. kafka

    2023-12-27 08:00:02       59 阅读
  4. <span style='color:red;'>kafka</span>

    kafka

    2023-12-27 08:00:02      51 阅读
  5. Kafka

    2023-12-27 08:00:02       67 阅读
  6. Kafka

    2023-12-27 08:00:02       50 阅读
  7. <span style='color:red;'>KAFKA</span>

    KAFKA

    2023-12-27 08:00:02      71 阅读
  8. <span style='color:red;'>Kafka</span>

    Kafka

    2023-12-27 08:00:02      53 阅读
  9. <span style='color:red;'>Kafka</span>

    Kafka

    2023-12-27 08:00:02      56 阅读
  10. Kafka

    2023-12-27 08:00:02       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-27 08:00:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-27 08:00:02       106 阅读
  3. 在Django里面运行非项目文件

    2023-12-27 08:00:02       87 阅读
  4. Python语言-面向对象

    2023-12-27 08:00:02       96 阅读

热门阅读

  1. python大作业 写作思路

    2023-12-27 08:00:02       47 阅读
  2. gRPC-Go基础(1)基础知识

    2023-12-27 08:00:02       59 阅读
  3. 深入理解 golang 中的反射机制

    2023-12-27 08:00:02       56 阅读
  4. Go配置镜像源

    2023-12-27 08:00:02       70 阅读
  5. 云原生Kubernetes系列 | Kubernetes Secret及ConfigMap

    2023-12-27 08:00:02       52 阅读
  6. 三、C++版本OpenCV的API使用

    2023-12-27 08:00:02       61 阅读
  7. 第六章2 总结+若干实战题

    2023-12-27 08:00:02       64 阅读
  8. FreeSWITCH的sip_gethostbyname=true

    2023-12-27 08:00:02       64 阅读
  9. python之glob的用法

    2023-12-27 08:00:02       69 阅读
  10. DshanMCU-R128s2硬件设计参考

    2023-12-27 08:00:02       54 阅读
  11. SpringBoot3 整合Redis

    2023-12-27 08:00:02       49 阅读
  12. vue3中安装并使用CSS预处理器Sass的方法介绍

    2023-12-27 08:00:02       62 阅读
  13. Redis Stream消息队列之基本语法与使用方式

    2023-12-27 08:00:02       39 阅读
  14. [oracle数据库]dblink的使用

    2023-12-27 08:00:02       63 阅读
  15. 如何将自建的ElasticSearch注册成一个服务

    2023-12-27 08:00:02       59 阅读