以下是一个使用Java编写的简单Apache Kafka生产者示例,使用的是Apache Kafka官方提供的Java客户端库。这个示例展示了如何创建一个Kafka生产者实例,配置必要参数,以及发送消息到指定Topic。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleKafkaProducer {
public static void main(String[] args) {
// 1. 定义Kafka生产者配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键序列化器
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值序列化器
// 2. 创建Kafka生产者实例
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 3. 定义要发送的消息
String topicName = "my-topic";
String key = "message-key";
String value = "Hello, Kafka!";
// 4. 发送消息到指定Topic
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value + " (" + i + ")");
producer.send(record);
}
// 5. 关闭生产者
producer.close();
}
}
在这个示例中:
首先,创建一个
Properties
对象来配置生产者。这里指定了Kafka集群的地址(bootstrap.servers
),以及键(key)和值(value)的序列化器。本例使用了字符串类型的序列化器。使用配置好的属性创建一个
KafkaProducer<String, String>
实例。这里的泛型参数对应消息的键和值类型。定义要发送的消息,包括目标Topic的名称、消息键和消息值。在这个示例中,我们发送的消息键是固定的字符串,消息值是一个包含序号的问候语。
使用生产者对象的
send()
方法发送消息到指定的Topic。循环发送了10条消息。发送完毕后,调用
close()
方法关闭生产者,释放资源。
为了运行此示例,确保您已经安装并启动了本地的Kafka服务,并创建了名为my-topic
的Topic。此外,需要将示例代码编译并依赖相应的Kafka客户端库(通常为kafka-clients
)。如果您使用Maven,可以在pom.xml
文件中添加如下依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version> <!-- 替换为您使用的Kafka版本 -->
</dependency>
</dependencies>
替换上述代码中的Kafka版本号为您的实际版本。编译并运行SimpleKafkaProducer
类,消息将被发送到指定的Kafka Topic中。