Kafka批量消费

在Spring Kafka中,使用@KafkaListener注解处理批量信息时,首先需要开启批量监听模式,并配置相应的consumer参数来控制批量消费行为。以下是配置和处理批量消息的基本步骤:

  1. 配置Kafka消费者工厂
    设置batchListener属性为true,使@KafkaListener支持批量消费。

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 开启批量监听模式
        factory.setBatchListener(true);
        // 其他相关配置,比如并发度、错误处理等
        return factory;
    }
    
  2. 配置消费者参数
    设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG,指定每次poll请求从Kafka服务器获取的最大记录数。并且关闭offset自动提交enable-auto-commit: false

    # application.properties 或 application.yml
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: my-group
          max-poll-records: 100
          # 其他配置项,如enable-auto-commit, auto-offset-reset等
    
  3. 编写批量处理方法
    定义一个方法,其参数是一个包含多条消息的列表,@KafkaListener注解下的方法将会接收到批量的消息。

    @KafkaListener(topics = "my-topic")
    public void processMessages(List<ConsumerRecord<String, String>> records,
                                Acknowledgment acknowledgment) {
        try {
            // 处理批量消息
            for (ConsumerRecord<String, String> record : records) {
                // 对每条消息进行处理
            }
    
            // 成功处理后手动提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 错误处理,记录错误,考虑是否重试或者有其他补偿措施
            log.error("Error processing message batch", e);
        }
    }
    
  4. 处理异常和偏移量提交
    当批量处理消息时,需要注意的是,一旦消息处理完成且没有错误,应当手动提交偏移量,以确认这些消息已经被成功消费。如果有消息处理失败,则可能需要根据业务需求选择不同的策略,比如重新尝试处理整个批次、跳过错误消息或者记录错误信息稍后处理。

通过以上步骤,@KafkaListener就能按照批处理的方式接收并处理Kafka主题中的消息了。

批量消费Kafka中的消息,然后将这些消息放入队列中,最后利用线程池异步处理这些队列中的消息。这种方式有助于优化资源利用率,尤其是当消息处理逻辑较为耗时或者IO密集型时,可以有效提升系统的并行处理能力和吞吐量。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class BatchMessageProcessor {

    private final ThreadPoolTaskExecutor taskExecutor;
    private final BlockingQueue<ConsumerRecord<String, String>> messageQueue = new LinkedBlockingQueue<>();

    public BatchMessageProcessor(ThreadPoolTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @KafkaListener(topics = "my-topic", batch = true)
    public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        for (ConsumerRecord<String, String> record : records) {
            // 将消费到的消息放入队列
            messageQueue.offer(record);
        }

        // 异步处理消息队列
        processMessageQueue(acknowledgment);
    }

    private void processMessageQueue(Acknowledgment acknowledgment) {
        List<ConsumerRecord<String, String>> messagesToProcess;
        synchronized (messageQueue) {
            // 从队列中批量取出消息
            messagesToProcess = new ArrayList<>(messageQueue.size());
            messageQueue.drainTo(messagesToProcess, 100); // 假设批量处理100条
        }

        if (!messagesToProcess.isEmpty()) {
            ListenableFuture<?> future = taskExecutor.submit(() -> {
                for (ConsumerRecord<String, String> record : messagesToProcess) {
                    // 实际处理消息的逻辑
                    processSingleMessage(record);
                }
                // 所有消息处理完毕后提交偏移量
                acknowledgment.acknowledge();
            });

            // 可以添加回调函数,用于处理线程池任务执行后的结果
            future.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onSuccess(Object result) {
                    // 处理成功逻辑
                }

                @Override
                public void onFailure(Throwable ex) {
                    // 处理失败逻辑,如日志记录、重试等
                }
            });
        }
    }

    private void processSingleMessage(ConsumerRecord<String, String> record) {
        // 这里实现单个消息的具体处理逻辑
    }
}

相关推荐

  1. Kafka批量消费

    2024-03-23 06:02:06       40 阅读
  2. springboot 配置kafka批量消费,并发消费

    2024-03-23 06:02:06       37 阅读
  3. Kafka之【消费消息

    2024-03-23 06:02:06       28 阅读
  4. Rust消费kafka

    2024-03-23 06:02:06       48 阅读
  5. kafka无法消费数据

    2024-03-23 06:02:06       49 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-23 06:02:06       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-23 06:02:06       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-23 06:02:06       87 阅读
  4. Python语言-面向对象

    2024-03-23 06:02:06       96 阅读

热门阅读

  1. Node.js Express

    2024-03-23 06:02:06       40 阅读
  2. python loguru 日志数据代码模块+飞书消息通知

    2024-03-23 06:02:06       44 阅读
  3. 设计模式(行为型设计模式——策略模式)

    2024-03-23 06:02:06       44 阅读
  4. vue 导出页面内容为word文件

    2024-03-23 06:02:06       38 阅读
  5. Android 生成Excel文件保存到本地

    2024-03-23 06:02:06       43 阅读
  6. python实现生成多种文件格式:excel、csv、pdf

    2024-03-23 06:02:06       38 阅读
  7. Linux 系统中 NumPy (Python 2) 编程环境

    2024-03-23 06:02:06       37 阅读