【Spring连载】使用Spring访问 Apache Kafka(十八)----非阻塞重试


版本2.9将机制更改为bootstrap基础设施bean;请参阅 配置,了解引导此功能所需的两种机制。
使用Kafka实现无阻塞重试/dlt功能通常需要设置额外的主题并创建和配置相应的监听器。由于2.7 Spring for Apache Kafka通过@RetryableTopic注解和RetryTopicConfiguration类提供了对它的支持,以简化bootstrap。
batch监听器不支持非阻塞重试。

一、这种模式是如何运作的How The Pattern Works

如果消息处理失败,则会将消息转发到具有back off时间戳的重试主题。重试主题consumer检查时间戳,如果不到期,则暂停该主题分区的消费。到期时,将恢复分区消费,消息再次被消费。如果消息处理再次失败,则消息将转发到下一个重试主题,并重复该模式,直到成功处理或尝试次数用完,并将消息发送到死信主题Dead Letter Topic(如果已配置)。
例如,如果你有一个"main-topic"主题,并且希望设置非阻塞重试,其backoff为1000ms,乘数为2,最大尝试次数4次,则它将创建 main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 和 main-topic-dlt 主题,并配置相应的consumers。框架还负责创建主题以及设置和配置监听器。
通过使用这种策略,你将失去Kafka对该主题的排序保证。
你可以设置自己喜欢的AckMode模式,但建议使用RECORD。
目前,此功能不支持类级别的@KafkaListener注解。
当使用asyncAcks设置为true的手动AckMode时,DefaultErrorHandler必须配置为seekAfterError设置为false。从2.9.10和3.0.8版本开始,对于此类配置,这将无条件设置为true。对于早期版本,有必要重写RetryConfigurationSupport.configureCustomizers()方法以将属性设置为true。

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
   
    customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}

此外,在这些版本之前,无论asyncAcks属性如何,使用默认(日志记录)DLT处理程序都与任何类型的手动AckMode不兼容。

二、回退延迟精度Back Off Delay Precision

概述和保证Overview and Guarantees

所有消息处理和回退都由consumer线程处理,因此,在尽最大努力的基础上保证了延迟精度。如果一条消息的处理时间比该消费者的下一条消息back off时间长,则下一条信息的延迟将高于预期。此外,对于短延迟(约1秒或更短),线程必须做的维护工作,如提交偏移量,可能会延迟消息处理的执行。如果重试主题的consumer正在处理多个分区,则精度也会受到影响,因为我们依赖于从轮询中唤醒consumer,并使用完整的pollTimeouts来进行时间调整。
也就是说,对于处理单个分区的consumers来说,在大多数情况下,消息的处理应该大致在其确切的到期时间进行。
消息在到期时间之前永远不会被处理是保证的。

三、配置Configuration

从2.9版本开始,对于默认配置,@EnableKafkaRetryTopic注释应该在@Configuration注解类中使用。这使该功能能够正确地引导,并允许在运行时插入要查找的一些功能组件。

如果添加此注释,则不必同时添加@EnableKafka,因为@EnableKafcaRetryTopic是用@EnableKavka进行元注释的。

此外,从该版本开始,对于功能组件和全局功能的更高级配置,RetryTopicConfigurationSupport类应在@configuration类中进行扩展,并覆盖适当的方法。有关更多详细信息,请参阅配置全局设置和功能。

默认情况下,重试主题的容器将与主容器具有相同的并发性。从3.0版本开始,您可以为重试容器设置不同的并发性(可以在注释上,也可以在RetryConfigurationBuilder中)。

只能使用上述技术中的一种,并且只有一个@Configuration类可以扩展RetryTopicConfigurationSupport。

四、Programmatic Construction

五、Features

5.1 BackOff Configuration

5.2 Global timeout

5.3 Exception Classifier

5.4 Include and Exclude Topics

5.5 Topics AutoCreation

5.6 失败报头管理Failure Header Management

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-28 08:14:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-28 08:14:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-01-28 08:14:03       82 阅读
  4. Python语言-面向对象

    2024-01-28 08:14:03       91 阅读

热门阅读

  1. 【leetcode100-069到073】【栈】五题合集

    2024-01-28 08:14:03       51 阅读
  2. 每日OJ题_算法_二分查找⑧_力扣LCR 173. 点名

    2024-01-28 08:14:03       56 阅读
  3. 【代码分享】

    2024-01-28 08:14:03       44 阅读
  4. 六、MySQL之视图与索引

    2024-01-28 08:14:03       48 阅读
  5. 强化学习 - Trust Region Policy Optimization (TRPO)

    2024-01-28 08:14:03       46 阅读
  6. Kong Upstream

    2024-01-28 08:14:03       46 阅读
  7. 单例模式(五种创建方式)

    2024-01-28 08:14:03       62 阅读
  8. PyTorch 之 nn.Parameter

    2024-01-28 08:14:03       50 阅读
  9. 编程语言比较—ruby,python,php比较

    2024-01-28 08:14:03       47 阅读