SpringBoot3下Kafka分组均衡消费实现

首先添加maven依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.11</version>
            <exclusions>
                <!--此处一定要排除kafka-clients,然后引入低版本client 不然后面分组消费功能会失效!-->
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.2</version>
        </dependency>
KafkaMessageListener.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaMessageListener {
    @KafkaListener(topics = "test-sfk",groupId = "test")
    public void listen5(ConsumerRecord<String, String> message) {
        log.info("5------>Received message: {}", message.value());
    }
    @KafkaListener(topics = "test-sfk",groupId = "test")
    public void listen4(ConsumerRecord<String, String> message) {
        log.info("4------>Received message: {}", message.value());
    }
}
TestController.java 用于发送测试消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("test")
public class TestController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @GetMapping("kafkaMessage")
    public String kafkaMessage(){
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("test-sfk","test:"+i);
        }

        return "ok";
    }
}

启动后调用接口发送消息;此时看到控制台消息输出已经是均衡消费了

2024-03-15T13:49:01.034+08:00  INFO 19548 --- [ntainer#1-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 4------>Received message: test:0
2024-03-15T13:49:01.035+08:00  INFO 19548 --- [ntainer#1-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 4------>Received message: test:1
2024-03-15T13:49:01.035+08:00  INFO 19548 --- [ntainer#1-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 4------>Received message: test:2
2024-03-15T13:49:01.295+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:3
2024-03-15T13:49:01.297+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:4
2024-03-15T13:49:01.300+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:5
2024-03-15T13:49:01.301+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:6
2024-03-15T13:49:01.303+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:7
2024-03-15T13:49:01.304+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:8
2024-03-15T13:49:01.306+08:00  INFO 19548 --- [ntainer#0-0-C-1] c.w.e.k.listener.KafkaMessageListener    : 5------>Received message: test:9

一开始不知道,直接引用的spring-kafka依赖,然后就分组消费失败,始终只能被其中一个消费端消费,这里不知道是新版本配置有差异,还是就是新版kafka-clients跟SpringBoot3兼容有问题,特此记录!

相关推荐

  1. SpringBoot3Kafka分组均衡消费实现

    2024-03-16 06:10:02       44 阅读
  2. kafka rebalance(再均衡)导致的消息积压分析

    2024-03-16 06:10:02       53 阅读
  3. springboot集成kafka消费数据

    2024-03-16 06:10:02       55 阅读
  4. springboot 配置kafka批量消费,并发消费

    2024-03-16 06:10:02       37 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-16 06:10:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-16 06:10:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-16 06:10:02       87 阅读
  4. Python语言-面向对象

    2024-03-16 06:10:02       96 阅读

热门阅读

  1. mysql 查询字段大于某长度的SQL

    2024-03-16 06:10:02       36 阅读
  2. Lua 学习

    2024-03-16 06:10:02       44 阅读
  3. SpringBoot打造企业级进销存储系统 第四讲

    2024-03-16 06:10:02       41 阅读
  4. 关于I2C(Inter-Integrated Circuit)死锁及解决措施

    2024-03-16 06:10:02       37 阅读
  5. 金钱感知-

    2024-03-16 06:10:02       39 阅读
  6. 在Flutter中创建自定义的左对齐TabBar组件

    2024-03-16 06:10:02       44 阅读
  7. 【C#动态加载数据】“防界面卡死”

    2024-03-16 06:10:02       46 阅读