kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止
直接上代码
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* kafka整体配置类
*
* @author Dean
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.admin.client-id}")
private String adminClientId;
@Bean
public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>(5);
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(AdminClientConfig.CLIENT_ID_CONFIG, adminClientId);
return AdminClient.create(configs);
}
/**
* 如果有多个消费组,需要定义多个不同的ConcurrentKafkaListenerContainerFactory
*
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
String groupId = "dmGroup";
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//是否自动提交ack
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//一次拉取最大数据量,默认值为500,如果拉取时不足配置的条数则有多少拉取多少
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
//是否批量这个设置好像只对配置了@KafkaListener的方法有用
factory.setBatchListener(false);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
//手动提交ack
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
/**
* kafka中间件的逻辑封装类
*/
@Slf4j
@Service
public class KafkaListenerManagement {
private final Map<String, MessageListenerContainer> containers = new ConcurrentHashMap<>();
/**
* 如果有多个消费组,需要注入多个不同的ConcurrentKafkaListenerContainerFactory
*/
private final ConcurrentKafkaListenerContainerFactory<String, String> containerFactory;
@Autowired
public KafkaListenerManagement(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
this.containerFactory = containerFactory;
}
/**
* 开启Topic的监听
*
* @param topic topic
* @param bizLogicConsumer 消息的业务逻辑处理
*/
public void startListening(String topic, BiConsumer<String, Acknowledgment> bizLogicConsumer) {
//必须手动提交ACK,否则停止监听后重新监听可能导致拉取到重复的记录
AcknowledgingMessageListener<String, String> messageListener =
(message, acknowledgment) -> bizLogicConsumer.accept(message.value(), acknowledgment);
MessageListenerContainer container = containerFactory.createContainer(topic);
container.setupMessageListener(messageListener);
container.start();
containers.put(topic, container);
}
/**
* 暂停监听
*
* @param topic topic
*/
public void pauseListening(String topic) {
MessageListenerContainer container = containers.get(topic);
container.pause();
}
/**
* 暂停后继续监听
*
* @param topic topic
*/
public void resumeListening(String topic) {
MessageListenerContainer container = containers.get(topic);
container.resume();
}
/**
* 停止监听
*
* @param topic topic
*/
public void stopListening(String topic) {
MessageListenerContainer container = containers.remove(topic);
if (container != null) {
container.stop();
}
}
}
/**
* Kafka生产者
*
* @author LiuChang
*/
@Service
public class KafkaProducerManagement {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 异步发送
*
* @param topic topic
* @param message 消息
* @return ListenableFuture
*/
public ListenableFuture<SendResult<String, String>> send(String topic, String message) {
return kafkaTemplate.send(topic, message);
}
}
import com.feiynn.kafka.management.KafkaListenerManagement;
import com.feiynn.kafka.management.KafkaProducerManagement;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 业务逻辑类
* 注意:业务逻辑类不建议直接调用kafka的API,都调用封装后的Kafka相关的Management类
*
* @author Dean
*/
@Slf4j
@Service
public class BizService {
@Resource
private KafkaListenerManagement kafkaListenerManagement;
@Resource
private KafkaProducerManagement kafkaProducerManagement;
/**
* 开启topic监听后的业务逻辑
*
* @param topic topic
*/
public void startListening(String topic) {
kafkaListenerManagement.startListening(topic, (data, acknowledgment) -> {
//消息处理业务逻辑
log.info("Received message value: [{}]", data);
try {
//降低消费速率,方便观察日志
TimeUnit.MILLISECONDS.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
acknowledgment.acknowledge();
});
}
/**
* 停止topic监听
*
* @param topic topic
*/
public void stopListening(String topic) {
kafkaListenerManagement.stopListening(topic);
}
/**
* 暂停监听
*
* @param topic topic
*/
public void pauseListening(String topic) {
kafkaListenerManagement.pauseListening(topic);
}
/**
* 暂停后继续监听
*
* @param topic topic
*/
public void resumeListening(String topic) {
kafkaListenerManagement.resumeListening(topic);
}
/**
* 发送消息
*
* @param topic topic
* @param message 消息
*/
public void sendMsg(String topic, String message) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaProducerManagement.send(topic, message);
//添加回调逻辑,异步获取发送结果
listenableFuture.addCallback((sendResult) -> {
//发送成功
log.trace("Send [{}] success", message);
}, (e) -> {
//发送失败,可以执行降级策略,或者把消息写入日志后续进行统一处理
log.error("Send [{}] failed", message, e);
});
}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = KafkaAdvancedApplication.class)
public class KafkaAdvancedTest {
@Resource
private BizService bizService;
/**
* 测试topic监听的开启、暂停、暂停后重新开始、停止
*/
@Test
public void startStopListening() throws InterruptedException {
String topicDm = "dm0";
//开启topic监听
bizService.startListening(topicDm);
TimeUnit.SECONDS.sleep(2);
//消息前缀,用来区分是上一次发送的未消费完的消息还是本次发送的消息
String msgPre = LocalTime.now().toString();
log.info("msgPre=[{}]", msgPre);
for (int i = 0; i < 2000; i++) {
bizService.sendMsg(topicDm, "Msg_" + msgPre + "_" + i);
}
TimeUnit.SECONDS.sleep(5);
log.info("pause listening begin");
bizService.pauseListening(topicDm);
log.info("pause listening success");
//暂停监听成功后,消费者会把配置max.poll.records条数的消息消费完才会真正停止,因此停顿足够长的时间后观察消息消费的日志是否会暂停输出
TimeUnit.SECONDS.sleep(20);
log.info("resume listening");
//暂停后重新开启消息监听
bizService.resumeListening(topicDm);
TimeUnit.SECONDS.sleep(20);
//新一轮暂停与重启
log.info("pause listening again");
bizService.pauseListening(topicDm);
TimeUnit.SECONDS.sleep(10);
log.info("resume listening again");
bizService.resumeListening(topicDm);
//继续消费一段时间
TimeUnit.SECONDS.sleep(10);
//消费一段时间后停止监听
log.info("stop listening");
bizService.stopListening(topicDm);
TimeUnit.SECONDS.sleep(20);
//重新开启topic监听
log.info("start listening again");
bizService.startListening(topicDm);
TimeUnit.SECONDS.sleep(120);
}
}
直接运行测试用例,通过观察日志,即可看出各种操作效果
遇到的问题
遇到停止监听topic后,从看到消费消息的日志观察,有时会一直打印,有时会打印一段时间就停止打印的问题,最终发现暂停监听方法调用成功后,消费者会把配置max.poll.records条数的消息消费完才会真正暂停或者停止。
另外如果不是手动提交ack,停止stop(不是暂停pause)订阅topic然后后重新开始订阅(start),可能会出现重复消费消息的问题,改成手动提交ack后问题不再出现。
还考虑到max.poll.interval.ms 最大拉取时间间隔是5分钟,尝试了暂停5分30秒看是否消费者会被因为rebalance,导致在resume重新监听无法成功,测试结果是没有问题,可以成功继续监听并消费消息。
代码已使用到生产环境。