Spring-Kafka笔记整理

  1. 引入依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 配置application.properties
    spring.kafka.bootstrap-servers=192.168.99.51:9092
    
  3. 编写kafka的配置类
    @Configuration
    public class KafkaConfig {
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configs);
        }
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            // 并发数就是一个消费者实例起几个线程
            factory.setConcurrency(3);
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    
  4. Kafka消息监听
    @Component
    public class KafkaConsumer {
        @Autowired
        private ObjectMapper mapper;
        @KafkaListener(
            topics = {"hello-kafka-topic"},
            groupId = "hello-kafka-group",
            containerFactory = "kafkaListenerContainerFactory"
        )
        public void listener01(ConsumerRecord<String, String> record) throws Exception {
            String key = record.key();
            String value = record.value();
            HelloMessage kafkaMessage = mapper.readValue(value, HelloMessage.class);
            log.info("in listener consume kafka message: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));
        }
    }
    
  5. Kafka消息发送
    @Component
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
        public void sendMessage(String key, String value, String topic) {
            if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
                throw new IllegalArgumentException("value or topic is null or empty");
            }
            ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
                    kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
            // 异步回调的方式获取通知
            future.addCallback(success -> {
                    assert null != success && null != success.getRecordMetadata();
                    // 发送到 kafka 的 topic
                    String _topic = success.getRecordMetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordMetadata().partition();
                    // 消息在分区内的 offset
                    long offset = success.getRecordMetadata().offset();
                    log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);
                }, failure -> {
                    log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
                }
            );
        }
    }
    

相关推荐

  1. Spring-Kafka笔记整理

    2024-03-17 05:28:04       44 阅读
  2. Kafka-SSL笔记整理

    2024-03-17 05:28:04       40 阅读
  3. spring整合kafka

    2024-03-17 05:28:04       23 阅读
  4. Spring-Mybatis读写分离笔记整理

    2024-03-17 05:28:04       42 阅读
  5. Kafka整理-Kafka Streams

    2024-03-17 05:28:04       46 阅读
  6. spring-boot2.x整合Kafka步骤

    2024-03-17 05:28:04       21 阅读
  7. Kafka整理-Consumer(消费者)

    2024-03-17 05:28:04       36 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-17 05:28:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-17 05:28:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-17 05:28:04       82 阅读
  4. Python语言-面向对象

    2024-03-17 05:28:04       91 阅读

热门阅读

  1. ZooKeeper

    ZooKeeper

    2024-03-17 05:28:04      42 阅读
  2. Spring Boot集成mapstruct快速入门指南

    2024-03-17 05:28:04       44 阅读
  3. 封装promise请求方式

    2024-03-17 05:28:04       48 阅读
  4. OLLAMA:如何像云端一样运行本地大语言模型

    2024-03-17 05:28:04       44 阅读
  5. alibaba cloud linux 3 安装 psql 16

    2024-03-17 05:28:04       46 阅读
  6. Python强大的库和框架——TensorFlow

    2024-03-17 05:28:04       44 阅读
  7. springBoot整合Redis(四、整合redis 实现分布式锁)

    2024-03-17 05:28:04       45 阅读
  8. lammps从NVT或者NPT切换到NVE时温度持续上升

    2024-03-17 05:28:04       42 阅读