Spring Boot学习(三十三):集成kafka

前言

下面是zookeeper和kafka的官网下载地址,大家可以学习下载

zookeeper下载地址:http://zookeeper.apache.org/releases.html

kafka下载地址:http://kafka.apache.org/downloads.html

1、添加依赖

在 pom.xml 文件中添加kafka依赖,依赖如下

	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>

2、配置Kafka信息

在 application.properties(或 application.yml)文件中配置 Kafka 的相关信息,下面是一个简单的示例:

#kafka地址,多个地址使用,分隔
spring.kafka.bootstrap-servers=127.0.0.1:9092
#消费者组ID
spring.kafka.consumer.group-id=myGroup
#序列化和反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、发送消息

因为我们是springboot项目,已经集成了KafkaTemplate,我们可以直接使用KafkaTemplate来发送消息

下面,我编写一个发送消息的生产者

/**
 * 消息生产者
 */
@Component
@Slf4j
public class KafkaProducer {
   


    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息
     * @param topic 主题
     * @param msg   消息
     */
    public void send(String topic,String msg){
   
        kafkaTemplate.send(topic,msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
   
            @Override
            public void onFailure(Throwable ex) {
   
                log.error("发送消息失败:{}", ex);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
   
                log.info("发送消息成功:{}");
            }
        });
    }

    /**
     * 发送消息
     * @param topic
     * @param msg
     */
    public void send(String topic, Object msg) {
   
        send(topic, JSONObject.toJSONString(msg));
    }


}

编写好生产者之后,我们就可以使用生产者发送消息,如下

	@Autowired
    private KafkaProducer kafkaProducer;

	@GetMapping("send")
    public void sendMsg(){
   
        kafkaProducer.send("my-topic","hello world");
    }

如果想定制KafkaTemplate,那么可以在配置类进行配置,如下所示

@Configuration
public class KafakaConfig {
   

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * 配置属性
     * @return
     */
    @Bean
    public Map<String, Object> producerConfigs() {
   
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
   
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 定制KafkaTemplate
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
   
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setDefaultTopic("myGroup");
        return kafkaTemplate;
    }


}

4、消费消息

使用 @KafkaListener 注解创建 Kafka 消费者,并监听指定的主题。接收到消息后,可以通过方法参数来接收消息:

@Slf4j
@Component
public class KafkaConsumer {
   

    /**
     * 消费my-topic主题的消息
     * @param message
     */
    @KafkaListener(topics = "my-topic",groupId = "myGroup1")
    public void  receiveMessage(String message){
   
        log.info("消费消息:"+message);
    }
}

同一消费者组,只会有一个消费者进行消费,如果想配置多个消费者同时处理,可以使用 @KafkaListener 注解来配置多个消费者。每个消费者需要配置不同的 group-id,监听主题一致,如下所示,就会有两个消费者同时消费

@Slf4j
@Component
public class KafkaConsumer {
   


    @KafkaListener(topics = "my-topic",groupId = "myGroup1")
    public void  receiveMessage(String message){
   
        log.info("消费消息:"+message);
    }


    @KafkaListener(topics = "my-topic",groupId = "myGroup2")
    public void  receiveMessage2(String message){
   
        log.info("消费消息:"+message);
    }
    
}

相关推荐

  1. Spring Boot学习):集成kafka

    2023-12-07 04:56:01       47 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-07 04:56:01       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-07 04:56:01       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-07 04:56:01       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-07 04:56:01       18 阅读

热门阅读

  1. RK3288升级WebView版本,替换webview app

    2023-12-07 04:56:01       35 阅读
  2. android 13.0 Camera2去掉前置摄像头闪光灯功能

    2023-12-07 04:56:01       36 阅读
  3. ThreadLocal+TaskDecorator实现父子线程 参数传递

    2023-12-07 04:56:01       37 阅读
  4. 【无标题】

    2023-12-07 04:56:01       47 阅读
  5. a href自定义下载文件名

    2023-12-07 04:56:01       42 阅读
  6. 设计模式&委派模式(Delegate Pattern)

    2023-12-07 04:56:01       33 阅读
  7. 【LeetCode】258. 各位相加

    2023-12-07 04:56:01       36 阅读
  8. Vue中的组件通信:从子到父的数据传递

    2023-12-07 04:56:01       40 阅读
  9. C++设计模式——建造者模式(Builder)

    2023-12-07 04:56:01       44 阅读
  10. ES6拓展API

    2023-12-07 04:56:01       32 阅读
  11. Socket.D 网络应用协议,首版发布!

    2023-12-07 04:56:01       38 阅读
  12. 字符指针变量

    2023-12-07 04:56:01       38 阅读
  13. 数据结构-基数排序

    2023-12-07 04:56:01       42 阅读