Kafka接收消息

// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,
                   @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition){
   System.out.println("读取到了时间消息: " + event);
}

Acknowledgment

开启手动确认模式;

listener:
	ack-mode: manual
// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,
                   @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                   ConsumerRecord<String,String> record,
                   Acknowledgment ack){
   ack.acknowledge(); // 手动确认,告诉kafka服务器该消息我已经收到了. 
   System.out.println("读取到了时间消息: " + event);
}

读消息指定分区

@KafkaListener(groupId = "hello",
           topicPartitions = {
               @TopicPartition(
                       topic = "${kafka.topic.test}",
                       partitions = {"0","1","2"}, // 0 1 2分区不限制偏移量
                       partitionOffsets = { // 3 分区只读 3偏移量之后的; 4分区只读 4偏移量之后的
                               @PartitionOffset(partition = "3",initialOffset = "3"),
                               @PartitionOffset(partition = "4",initialOffset = "3")
                       })
           }
)

批量消费

修改配置

kafka:
    bootstrap-servers: 192.168.225.128:9092
    listener:
      type: batch
    # 每次读取20条
    consumer:
      max-poll-records: 20

消费者端接收一个List即可

@KafkaListener(topics = {"hi"},groupId = "batchGroup2")
public void onEvent3(List<ConsumerRecord<String,String>> records){
    System.out.println(records.size());
}

消息拦截

在这里插入图片描述

相关推荐

  1. Kafka之【消费消息

    2024-07-14 07:26:02       25 阅读
  2. 接收Kafka数据并消费至Hive表

    2024-07-14 07:26:02       52 阅读
  3. kafka发送大消息

    2024-07-14 07:26:02       60 阅读
  4. 消息队列和Kafka

    2024-07-14 07:26:02       44 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-14 07:26:02       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-14 07:26:02       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-14 07:26:02       58 阅读
  4. Python语言-面向对象

    2024-07-14 07:26:02       69 阅读

热门阅读

  1. 代码随想录第五十五天打卡

    2024-07-14 07:26:02       24 阅读
  2. 《HarmonyOS应用开发者基础认证》考试题目

    2024-07-14 07:26:02       27 阅读
  3. 每天一个数据分析题(四百二十六)- 总体方差

    2024-07-14 07:26:02       23 阅读
  4. [C++]类与对象

    2024-07-14 07:26:02       20 阅读
  5. 大模型日报 2024-07-13

    2024-07-14 07:26:02       20 阅读
  6. 家校管理系统

    2024-07-14 07:26:02       18 阅读
  7. 使用vllIm部署大语言模型

    2024-07-14 07:26:02       23 阅读
  8. 在Debian 7上安装和保护phpMyAdmin的方法

    2024-07-14 07:26:02       30 阅读
  9. Nginx 负载均衡详解

    2024-07-14 07:26:02       21 阅读
  10. Git常用命令

    2024-07-14 07:26:02       27 阅读
  11. 软设之访问者模式

    2024-07-14 07:26:02       19 阅读