Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性

        Properties properties = new Properties();

       //连接集群

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

        //反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //指定消费者组id

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
    //创建属性
        Properties properties = new Properties();
       //连接集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //指定消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

    //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    /*//订阅主题
        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);*/

    //订阅分区
        //创建一个数组列表变量接收主题分区值
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        //指定要订阅的分区
        topicPartitions.add(new TopicPartition("customers",2));
        //订阅分区
        kafkaConsumer.assign(topicPartitions);

    //消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }


    }
}

相关推荐

  1. kafka消费者

    2024-06-06 03:44:04       53 阅读
  2. Flink入门之DataStream APIkafka消费者

    2024-06-06 03:44:04       64 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-06 03:44:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-06 03:44:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-06 03:44:04       87 阅读
  4. Python语言-面向对象

    2024-06-06 03:44:04       96 阅读

热门阅读

  1. deque

    deque

    2024-06-06 03:44:04      30 阅读
  2. 关于焊点检测SJ-BIST)模块实现

    2024-06-06 03:44:04       30 阅读
  3. 日常实习-小米计算机视觉算法岗面经

    2024-06-06 03:44:04       35 阅读
  4. 【Golang】go语言写入数据并保存Excel表格

    2024-06-06 03:44:04       27 阅读
  5. 054、Python 函数的概念以及定义

    2024-06-06 03:44:04       33 阅读
  6. 【安卓配置WebView以允许非HTTPS页面访问摄像头】

    2024-06-06 03:44:04       31 阅读
  7. MySQL并发事务是怎么处理的?

    2024-06-06 03:44:04       32 阅读
  8. 欲除烦恼须无我,各有前因莫羡人

    2024-06-06 03:44:04       21 阅读
  9. Python连接数据库进行数据查询

    2024-06-06 03:44:04       28 阅读
  10. flink Transformation算子(更新中)

    2024-06-06 03:44:04       23 阅读
  11. 探索 Linux 命令 `yum`:软件包管理器的奥秘

    2024-06-06 03:44:04       32 阅读