kafka

kafka内容

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

 kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

#docker安装zookeeper

#下载镜像

docker pull zookeeper

#创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper

#docker安装kafka

#下载镜像

docker pull wurstmeister/kafka

#创建容器

docker run -d --name kafka \
-p 9092:9092 \
--env KAFKA_ADVERTISED_HOST_NAME=islunatic.icu \
--env KAFKA_ZOOKEEPER_CONNECT=islunatic.icu:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://islunatic.icu:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
wurstmeister/kafka

 kafka初使用

//导入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
/**
 * 生产者
 */
public class Producer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.kafka链接配置信息
        Properties prop = new Properties();
        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"islunatic.icu:9092");
        //key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //ack配置  消息确认机制
        prop.put(ProducerConfig.ACKS_CONFIG,"all");

        //重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG,10);

        //数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");

        //2.创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-phb",0,"kafka","hello kafka");
        //同步发送消息
        /*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
        System.out.println(recordMetadata.offset());*/

        //异步消息发送
        producer.send(kvProducerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if(e != null){
                    System.out.println("记录异常信息到日志表中");
                }
                System.out.println(recordMetadata.offset());
            }
        });

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();
    }
}
/**
 * 消费者
 */
public class Consumer {

    public static void main(String[] args) {

        //1.kafka的配置信息
        Properties prop = new Properties();
        //链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "islunatic.icu:9092");
        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "pgroup");

        //手动提交偏移量
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("topic-phb"));

        //4.拉取消息

        //同步提交和异步提交偏移量
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key());
                    System.out.println(consumerRecord.value());
                    System.out.println(consumerRecord.offset());
                    System.out.println(consumerRecord.partition());
                }
                //异步提交偏移量
                consumer.commitAsync();
            }
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("记录错误的信息:"+e);
        }finally {
            //同步
            consumer.commitSync();
        }
    }
}

注意 

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

Kafka 的服务器端由被称为 Broker 的服务进程构成,一个 Kafka 集群由多个 Broker 组成  

SpringBoot集成kafka 

//导入依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>
//kafka配置
server:
  port: 10000
spring:
  application:
    name: kafka-test
  kafka:
    bootstrap-servers: islunatic.icu:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//生产者
@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/kafka")
    public String helloKafka(){
        User user = new User();
        user.setUsername("islunatic");
        user.setAge(20);

        kafkaTemplate.send("phb-topic", JSON.toJSONString(user));
    }
}
//消费者
@Component
public class KafkaListener {
   @KafkaListener(topics = "phb-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }
    }
}

相关推荐

  1. kafka

    2024-01-24 06:14:06       58 阅读
  2. <span style='color:red;'>kafka</span>

    kafka

    2024-01-24 06:14:06      50 阅读
  3. Kafka

    2024-01-24 06:14:06       66 阅读
  4. Kafka

    2024-01-24 06:14:06       50 阅读
  5. <span style='color:red;'>KAFKA</span>

    KAFKA

    2024-01-24 06:14:06      70 阅读
  6. <span style='color:red;'>Kafka</span>

    Kafka

    2024-01-24 06:14:06      50 阅读
  7. <span style='color:red;'>Kafka</span>

    Kafka

    2024-01-24 06:14:06      56 阅读
  8. Kafka

    2024-01-24 06:14:06       55 阅读
  9. <span style='color:red;'>Kafka</span>

    Kafka

    2024-01-24 06:14:06      40 阅读
  10. kafka

    2024-01-24 06:14:06       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-24 06:14:06       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-24 06:14:06       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-24 06:14:06       82 阅读
  4. Python语言-面向对象

    2024-01-24 06:14:06       91 阅读

热门阅读

  1. HTML/CSS实现3D翻转页面效果

    2024-01-24 06:14:06       58 阅读
  2. elementui 表单数据嵌套过深导致校验不了问题解决

    2024-01-24 06:14:06       53 阅读
  3. 编程笔记 html5&css&js 055 css默认值

    2024-01-24 06:14:06       46 阅读
  4. 【开发工具】从eclipse到idea的过度

    2024-01-24 06:14:06       52 阅读
  5. PyTorch

    2024-01-24 06:14:06       52 阅读
  6. 【PyTorch】PyTorch之包装容器

    2024-01-24 06:14:06       38 阅读
  7. React中实现虚拟加载滚动

    2024-01-24 06:14:06       52 阅读
  8. 设计模式-单例模式

    2024-01-24 06:14:06       50 阅读
  9. 和GPT讨论知识蒸馏的基本概念

    2024-01-24 06:14:06       55 阅读
  10. 【Sentinel 控制台无应用显示-如何排查】

    2024-01-24 06:14:06       54 阅读