如何查看Kafka的偏移量offset

本文介绍三种方法查看Kafka的偏移量offset。

1. API:ConsumerRecord的offset()方法查看offset。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

前提条件

Kafka安装及基本操作,可参考:Kafka安装及基本操作

Kafka API操作,可参考:Kafka API操作

三种方法查看Kafka的偏移量offset

1. API:ConsumerRecord的offset()方法查看offset。

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class MyProducer {
    public static void main(String[] args) {
        // 1.创建kafka生产者对象
        Properties prop = new Properties();
        prop.put("bootstrap.servers","node1:9092");
        prop.put("acks","all");
        prop.put("retries","0");
        // 16k一个批量
        prop.put("batch.size", 16384);
        prop.put("linger.ms",5);
        prop.put("buffer.memory", 33554432);

        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<Object, Object> producer = new KafkaProducer<>(prop);


        // 2.使用send方法生产数据
        for (int i = 0; i < 10; i++) {
//            producer.send(new ProducerRecord<>("Hello-Kafka", Integer.toString(i), Integer.toString(i)));
            producer.send(new ProducerRecord<>("bigdata12", Integer.toString(i), Integer.toString(i)));
        }

        // 3.关闭生产者
        producer.close();

    }
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        //1.创建消费者对象
        Properties prop = new Properties();
        prop.put("bootstrap.servers","node1:9092");
        prop.put("group.id","test");
        prop.put("enable.auto.commit","true");
        prop.put("auto.commit.interval.ms","1000");
        prop.put("session.timeout.ms","30000");
        prop.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");//注意不是StringSerializer
        prop.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(prop);
        //2.消费者订阅主题
        consumer.subscribe(Arrays.asList("bigdata12"));// 将数组转为List集合

        //3.使用poll方法消费数据
        while (true){
//            ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofSeconds(5));
            ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

            for (ConsumerRecord<Object, Object> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s\n",
                        record.offset(),record.key(),record.value());
            }
        }

    }
}

测试:

IDEA中,运行消费者,再运行生产者。提示:没有topic,将自动创建。

返回IDEA的消费者控制台,输出类似如下数据

...

offset=30, key=8, value=8
offset=31, key=9, value=9

这里显示的是最后一条数据的offset=31。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class KafkaOffsetViewer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "bigdata12";
        TopicPartition partition = new TopicPartition(topic, 0);

        try {
            consumer.assign(Arrays.asList(partition));
            consumer.seekToEnd(Arrays.asList(partition));
            long offset = consumer.position(partition);
            System.out.println("Offset of partition 0 is: " + offset);
        } finally {
            consumer.close();
        }
    }
}

IDEA运行结果:

Offset of partition 0 is: 32

看到offset为32,是最新的offset值,也就是下一条数据从32开始。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

在命令行中运行以下命令:

kafka-consumer-groups --bootstrap-server <kafka-broker-list> --describe --group <consumer-group-id>

例如:

[hadoop@node1 ~]$ kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test
​
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
test            bigdata12       0          32              32              0               consumer-test-1-64d17e50-69e9-47e3-9380-f2441a09cae2 /117.189.125.24 consumer-test-1
​

看到offset为32,是最新的offset值。

感兴趣可以再使用生产者发送数据测试,看到三种查看offset方法,offset值的变化情况。

总结

1. API:ConsumerRecord的offset()方法查看offset,查看到最后一条数据的offset,最新offset=最后一条数据offset+1。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset,查到最新offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset,查到最新offset。

完成! enjoy it!

相关推荐

  1. Kafka消费者提交偏移

    2024-07-23 09:30:02       50 阅读
  2. kafka 偏移类型与提交方式

    2024-07-23 09:30:02       59 阅读
  3. 获取kafka中topic偏移和消费偏移

    2024-07-23 09:30:02       33 阅读
  4. 每期一个小窍门: 重置kafka 消费者偏移

    2024-07-23 09:30:02       53 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-23 09:30:02       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-23 09:30:02       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-23 09:30:02       45 阅读
  4. Python语言-面向对象

    2024-07-23 09:30:02       55 阅读

热门阅读

  1. 银行卡二三四要素核验的多种应用场景

    2024-07-23 09:30:02       17 阅读
  2. 怎么在 Ubuntu 中卸载已经安装的软件

    2024-07-23 09:30:02       15 阅读
  3. ubuntu 源码安装postgis插件

    2024-07-23 09:30:02       16 阅读
  4. SpringCloud-Zuul-03

    2024-07-23 09:30:02       18 阅读
  5. uniapp picker-view 搜索选择框

    2024-07-23 09:30:02       17 阅读
  6. 前端面试题

    2024-07-23 09:30:02       15 阅读
  7. c 语言 中 是否有 unsigned 安;这种写法?

    2024-07-23 09:30:02       16 阅读
  8. Mojo模型与特征选择:数据科学中的智能筛选艺术

    2024-07-23 09:30:02       16 阅读
  9. PHP 数组排序算法对并行处理的影响

    2024-07-23 09:30:02       17 阅读
  10. Symbol

    2024-07-23 09:30:02       15 阅读