Kafka-02 @KafkaListener学习

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.11</version>
</dependency>

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ----------------------------
protected void finishRefresh() {
   clearResourceCaches();

   initLifecycleProcessor();

   // 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessor
   getLifecycleProcessor().onRefresh();

   publishEvent(new ContextRefreshedEvent(this));

   if (!NativeDetector.inNativeImage()) {
      LiveBeansView.registerApplicationContext(this);
   }
}



// ------------------------------ DefaultLifecycleProcessor ----------------------------
public void onRefresh() {
    startBeans(true);
    this.running = true;
}



// ------------------------------ DefaultLifecycleProcessor ----------------------------
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
	Lifecycle bean = lifecycleBeans.remove(beanName);
	if (bean != null && bean != this) {
		String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
		for (String dependency : dependenciesForBean) {
			doStart(lifecycleBeans, dependency, autoStartupOnly);
		}
		if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) || 
             ((SmartLifecycle) bean).isAutoStartup())) {
			try {
                // 获取容器中的 LifeCycle bean 对象,调用它的 start()
                // SpringKafka 中对应的是 KafkaListenerEndpointRegistry
                // 我们重点看一下 KafkaListenerEndpointRegistry.start()
				bean.start();
			}
			catch (Throwable ex) {
				throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
			}
		}
	}
}

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
public void start() {
    // 轮询所有的 ConcurrentMessageListenerContainer 对象
    // 执行 ConcurrentMessageListenerContainer.start()
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
        startIfNecessary(listenerContainer);
    }
    this.running = true;
}



// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if ((this.contextRefreshed && this.alwaysStartAfterRefresh) 
        || listenerContainer.isAutoStartup()) {
        // 执行 ConcurrentMessageListenerContainer.start()
        listenerContainer.start();
    }
}



// ---------------------------- AbstractMessageListenerContainer ---------------------------
public final void start() {
    checkGroupId();
    synchronized (this.lifecycleMonitor) {
        if (!isRunning()) {
            // 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()
            doStart();
        }
    }
}

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer ---------------------------
protected void doStart() {
    if (!isRunning()) {
        checkTopics();
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.concurrency = topicPartitions.length;
        }
        setRunning(true);

        // 1. 根据 @KafkaListener 中配置的 concurrency 轮询
        for (int i = 0; i < this.concurrency; i++) {
            // 2. 创建 KafkaMessageListenerContainer
            KafkaMessageListenerContainer<K, V> container =
                constructContainer(containerProperties, topicPartitions, i);
            
            // 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置
            configureChildContainer(i, container);
            if (isPaused()) {
                container.pause();
            }
            
            // 4. 启动 KafkaMessageListenerContainer
            container.start();
            
            // 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中
            this.containers.add(container);
        }
    }
}

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
    String beanName = getBeanName();
    beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
    container.setBeanName(beanName);
    ApplicationContext applicationContext = getApplicationContext();
    if (applicationContext != null) {
        container.setApplicationContext(applicationContext);
    }
    ApplicationEventPublisher publisher = getApplicationEventPublisher();
    if (publisher != null) {
        container.setApplicationEventPublisher(publisher);
    }

    // 设置 clinetIdSuffix,clientId 前缀
    container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
    container.setGenericErrorHandler(getGenericErrorHandler());
    container.setCommonErrorHandler(getCommonErrorHandler());
    container.setAfterRollbackProcessor(getAfterRollbackProcessor());
    container.setRecordInterceptor(getRecordInterceptor());
    container.setBatchInterceptor(getBatchInterceptor());
    container.setInterceptBeforeTx(isInterceptBeforeTx());
    container.setListenerInfo(getListenerInfo());
    AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
    if (exec == null) {
        // 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors
        exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
        this.executors.add(exec);
        
        // 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer
        container.getContainerProperties().setConsumerTaskExecutor(exec);
    }
}

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {
    if (isRunning()) {
        return;
    }
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);

    Object messageListener = containerProperties.getMessageListener();
    AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
    if (consumerExecutor == null) {
        consumerExecutor = new SimpleAsyncTaskExecutor(
            (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }
    GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
    ListenerType listenerType = determineListenerType(listener);
    
    // 1. 创建 ListenerConsumer
    // ListenerConsumer 是一个 Runnable 对象
    // new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员
    // 它的 run() 比较重要
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    
    // 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用
    this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {
    ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
    publishConsumerStartingEvent();
    this.consumerThread = Thread.currentThread();
    setupSeeks();
    KafkaUtils.setConsumerGroupId(this.consumerGroupId);
    this.count = 0;
    this.last = System.currentTimeMillis();
    initAssignedPartitions();
    publishConsumerStartedEvent();
    Throwable exitThrowable = null;
    
    // 开启自旋
    while (isRunning()) {
        // 通过 KafkaConsumer 向 kafka-server 发起 poll 请求
        pollAndInvoke();
    }
    wrapUp(exitThrowable);
}

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
   if (this.invokerHandlerMethod != null) {
       // 最终的执行入口
       // 最后会通过反射调用我们的 @KafkaListener 声明的方法
      return this.invokerHandlerMethod.invoke(message, providedArgs);
   } else if (this.delegatingHandler.hasDefaultHandler()) {
      Object[] args = new Object[providedArgs.length + 1];
      args[0] = message.getPayload();
      System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
      return this.delegatingHandler.invoke(message, args);
   } else {
      return this.delegatingHandler.invoke(message, providedArgs);
   }
}

至此,SpringKafka 分析完毕;

相关推荐

  1. @KafkaListener指定kafka集群

    2024-07-11 11:22:02       48 阅读
  2. Kafka的@KafkaListener注解参数详解

    2024-07-11 11:22:02       47 阅读
  3. Spring Kafka 之 @KafkaListener 注解详解

    2024-07-11 11:22:02       28 阅读
  4. springboot2.2.9整合kafkaKafkaListener实现原理

    2024-07-11 11:22:02       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 11:22:02       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 11:22:02       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 11:22:02       58 阅读
  4. Python语言-面向对象

    2024-07-11 11:22:02       69 阅读

热门阅读

  1. 华为机试HJ84统计大写字母个数

    2024-07-11 11:22:02       20 阅读
  2. MySQL中in和exists的区别

    2024-07-11 11:22:02       20 阅读
  3. Spring Boot 常用 Starter

    2024-07-11 11:22:02       22 阅读
  4. dify/api/models/tool.py文件中的数据表

    2024-07-11 11:22:02       22 阅读
  5. 【SQL】InnoDB的意向锁

    2024-07-11 11:22:02       24 阅读
  6. SpringSecurity中文文档(Servlet OAuth 2.0 Client)

    2024-07-11 11:22:02       19 阅读
  7. Linux串口设备的使用<ubuntu>

    2024-07-11 11:22:02       21 阅读
  8. 计算机如何学习

    2024-07-11 11:22:02       19 阅读