Kafka 生产者缓存

不建议使用:

public void produce(String message) {
   
    DmsProducer<String, String> producer = new DmsProducer<String, String>();
    try {
   
        producer.produce("test1",0, "key", message);
    } finally {
   
        producer.close();
    }
}

原因:
每次调用produce函数时,都会新建一个producer,每次都需要进行新建与broker连接->获取分区元数据->生产消息的过程,存在性能瓶颈
在这里插入图片描述

正确用法:

private DmsProducer producer;

public void init() {
   
    this.producer = new DmsProducer();
}

public void produce(String message) {
   
    if (producer == null) {
   
        this.init();
    }
    produce(message);
}

public void tearDown() {
   
    if (this.producer != null) {
   
        producer.close();
    }
}

进阶优化:

按照上面的使用方式,如果在多线程的环境下,可能会出现一个producer单个sender线程会有性能瓶颈(单发送线程),可能导致,导致入大于出,最终缓存堆积

示例代码:

private ProducerLru cache = new ProducerLru(5);

class ProducerLru<k, v> extends LinkedHashMap<k, v> {
   
    
    private int capacity;
    
    public ProducerLru(int capacity) {
   
        super(capacity, 0.75f, true);
        this.capacity = capacity;
    }
    
    @Override
    protected boolean removeEldestEntry(Map.Entry<k, v> eldest) {
   
        if (size() > capacity) {
   
            if (eldest.getValue() instanceof  DmsProducer) {
   
                DmsProducer producer = (DmsProducer) eldest.getValue();
                producer.close();
            }
        }
        return super.size() > capacity;
    }
    
}

public static void main(String[] args) throws Exception {
   
    Main main = new Main();
    String topicA = "topicA";
    DmsProducer producer;
    if (main.cache.get(topicA) != null) {
   
        producer = (DmsProducer) main.cache.get(topicA);
        main.cache.put(topicA, producer);
    } else {
   
        producer = new DmsProducer();
    }
    main.cache.put(topicA, producer);
}

上述示例中,producerLru的key可以是topic名称,如果是多个用户场景下,也可以以用户名作为key,可以根据实际业务场景进行调整

相关推荐

  1. Kafka生产者

    2024-02-01 08:18:01       30 阅读
  2. kafka生产者与消费者

    2024-02-01 08:18:01       40 阅读
  3. Kafka建立生产者消费者

    2024-02-01 08:18:01       28 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-01 08:18:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-01 08:18:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-01 08:18:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-01 08:18:01       20 阅读

热门阅读

  1. 物流无人机在哪些场景最适合应用?

    2024-02-01 08:18:01       28 阅读
  2. Flink 集成和使用 Hive Metastore

    2024-02-01 08:18:01       37 阅读
  3. C++ 结构体的构造函数

    2024-02-01 08:18:01       33 阅读
  4. 面阵相机拍摄运动的物体怎样保证图像清晰

    2024-02-01 08:18:01       40 阅读
  5. TensorFlow2实战-系列教程14:Resnet实战2

    2024-02-01 08:18:01       44 阅读
  6. 3D Gaussian Splatting-实时辐射场渲染技术

    2024-02-01 08:18:01       33 阅读
  7. TensorFlow2实战-系列教程15:Resnet实战3

    2024-02-01 08:18:01       39 阅读
  8. CSS 中的 :is(), :where(), 和 :has() 选择器简介

    2024-02-01 08:18:01       33 阅读
  9. 使用certbot申请https通配符证书【阿里云篇】

    2024-02-01 08:18:01       38 阅读
  10. K8S网络

    K8S网络

    2024-02-01 08:18:01      33 阅读
  11. k8s学习-数据管理

    2024-02-01 08:18:01       29 阅读