kafka入门(四):kafka生产者发送消息

创建生产者实例和构建消息之后,就可以开始发送消息了。

发送消息主要有三种模式:发后即忘、同步、异步。

发后即忘:

就是直接调用 生产者的 send方法发送。

发后即完,只管往 kafka中发送消息,而不关心消息是否正确到达。

这种发送方式的性能最高,可靠性也最差。

producer.send(record);

具体代码如下:

public class KafkaDemoProducer {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";

    public static void main(String[] args) {
        //属性配置
        Properties properties = getProperties(BROKER_LIST);
        //生产者初始化
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
        //发送消息
        try {
            producer.send(record);
            System.out.println("========>producer.send(record).");
        } catch (Exception e) {
            System.out.println("send error." + e);
        }
        producer.close();
    }


    private static Properties getProperties(String brokerList) {
        Properties properties = new Properties();
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        return properties;
    }

}

同步发送:

try {
	producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
	log.error("send record get error", e);
}

同步发送的方式可靠性最高,要么消息发送成功,要么发生异常。如果发生异常,会catch并处理异常。

同步发送的性能会差一些,需要阻塞等待一条消息发送完,才能发送下一条。

异步发送:

异步发送,就是在 send 方法里指定一下 Callback 的回调函数。

消息发送成功后,会收到成功的回调。参数 metadata ,为发送成功的消息,相关的信息

如果发送失败,也会收到回调,包含失败的异常信息 exception。

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    	if (exception != null) {
    		log.error("send onCompletion error." , exception);
   		} else {
  			log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
  	     }
    }
});

参考资料:

《深入理解Kafka 核心设计与实践原理》

相关推荐

  1. kafka入门():kafka生产者发送消息

    2023-12-18 07:30:06       56 阅读
  2. python连接kafka生产者发送消息

    2023-12-18 07:30:06       23 阅读
  3. kafka发送消息

    2023-12-18 07:30:06       63 阅读
  4. Kafka发送对象消息

    2023-12-18 07:30:06       23 阅读
  5. kafka判断生产者是否向kafka集群成功发送消息

    2023-12-18 07:30:06       27 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-18 07:30:06       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-18 07:30:06       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-18 07:30:06       82 阅读
  4. Python语言-面向对象

    2023-12-18 07:30:06       91 阅读

热门阅读

  1. 基于Spring、SpringMVC、MyBatis的校园订餐系统

    2023-12-18 07:30:06       70 阅读
  2. 开源语音识别faster-whisper部署教程

    2023-12-18 07:30:06       67 阅读
  3. C# 内存的分配管理

    2023-12-18 07:30:06       56 阅读
  4. React 表单与事件

    2023-12-18 07:30:06       54 阅读
  5. 第二十章 : Spring Boot 集成RabbitMQ(四)

    2023-12-18 07:30:06       61 阅读
  6. 解决spa页面首屏加载慢的方式笔记

    2023-12-18 07:30:06       63 阅读
  7. 解决阿里云ECS磁盘在线扩容不生效

    2023-12-18 07:30:06       60 阅读
  8. 微服务Redis-Session共享登录状态

    2023-12-18 07:30:06       41 阅读
  9. centos-静态ip及修改主机名

    2023-12-18 07:30:06       57 阅读
  10. 【React基础三】组件传值、高阶组件、Hook

    2023-12-18 07:30:06       57 阅读
  11. 如何使用ffmpeg高效的压缩视频

    2023-12-18 07:30:06       62 阅读
  12. C语言学习day09:运算符(下)

    2023-12-18 07:30:06       61 阅读