<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers:
- ip:端口
producer:
retries: 0
acks: 1
batch-size: 16384
properties:
linger:
ms: 100
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
public void sendKfaka(JSONObject body){
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic1", body);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
// 消息成功发送的处理逻辑
log.info("Message sent successfully to the topic: " + result.getRecordMetadata().topic() +
" with offset: " + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
// 消息发送失败的处理逻辑
log.error("Unable to send message to the topic: ",ex);
}
});
}
}