SpringCloud Stream配置详解

SpringCloud Stream配置

SpringCloud Stream的配置以spring.cloud.stream为前缀,主要分为三部分

1、基础配置

  1. spring.cloud.stream.default-binder:指定默认的消息绑定器。可以设置为消息中间件(如 RabbitMQ、Kafka)的名称,用于在应用程序中使用该默认绑定器,默认值为kafka。例如:
spring.cloud.stream.default-binder=rabbit
  1. spring.cloud.stream.instance-count:配置消息中间件消费者实例数量的属性,它定义了要启动的消费者实例数量,默认为1,例如,设置该属性为2将会启动两个消费者实例来处理消息,从而提高系统的吞吐量和可靠性。当使用Kafka等多分区的消息中间件时,建议将实例数设置为分区数的倍数,以便更好地利用资源并获得更好的性能。
  2. spring.cloud.stream.instance-index:用于配置消息中间件消费者实例索引的属性,它定义了当前消费者实例在一组实例中的位置,范围从0到n-1,其中n表示实例数。该属性通常与spring.cloud.stream.instance-count属性一起使用,以确保每个消费者实例都有唯一的索引值。

2、绑定通道配置

通用配置

适用于输入和输出通道,通过spring.cloud.stream.bindings..前缀配置,源码类BindingProperties中存有所有可配置属性。

下面是BindingProperties的属性介绍

destination:设置输入或输出通道的目标,如rabbitmq中为exchange名,kafka为topic名

例如,对于名为 myInput 的输入通道:

spring.cloud.stream.bindings.myInput.destination=myTopic

contentType:设置输入或输出通道的消息内存类型名,默认为application/json

group:设置消费者分组

binder:存在多个绑定器时,指定当前通道使用的绑定器

消费者配置

spring.cloud.stream.bindings..consumer为前缀,用于定义消费者的属性,只对输入通道有效,源码类为
ConsumerProperties

concurrency:设置消费者的并发数,默认为1,可以根据自己的需求进行修改,例如:

spring.cloud.stream.bindings.myInput.consumer.concurrency=5

partitioned:消费者是否从分区的生产者接收数据,默认为false

instanceCountinstanceIndex:用于在多个实例之间进行分区和负载均衡。instanceCount 指定实例的总数,instanceIndex 指定当前实例的索引,两个属性默认都为-1

spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=0

maxAttempts:设置消息消费失败时的最大重试次数,默认为3,可以根据自己的需求进行修改,例如:

spring.cloud.stream.bindings.myInput.consumer.max-attempts=5

backOffInitialInterval:重试消息处理的初始间隔时间,默认值为1000
backOffMaxInterval:重试消息处理的最大间隔时间,默认值为10000
backOffMultiplier:重试消息处理时间间隔的递增乘数,默认值为2.0

生产者属性

spring.cloud.stream.bindings..producer为前缀,用于定义生产者的属性,只对输出通道有效,配置源码类为ProducerProperties

  1. partitionKeyExpression:设置用于分区的表达式。可以使用 SpEL 表达式来指定分区键。例如:
   spring.cloud.stream.bindings.myOutput.producer.partition-key-expression="'key'"

这将使用 ‘key’ 作为分区键。

  1. partitionCount:设置要发送到的分区数。例如:
   spring.cloud.stream.bindings.myOutput.producer.partition-count=3

这将设置要发送到 3 个分区。

  1. requiredGroups:设置生产者所需的消费者组。只有具有相同消费者组的消费者才能接收到来自该生产者的消息。例如:
   spring.cloud.stream.bindings.myOutput.producer.required-groups=myGroup1,myGroup2

这设置只有 myGroup1`和 myGroup2的消费者组接收来自该生产者的消息。

  1. headerMode:设置要使用的消息头模式。可以是 noneembeddedHeadersheaders。例如:
spring.cloud.stream.bindings.myOutput.producer.header-mode=embeddedHeaders

headerMode 有以下几种取值:

  1. none:不处理消息头部。此模式下,消息的头部将被忽略,只处理消息体。

  2. embeddedHeaders:将消息头部嵌入到消息体中。在发送消息时,消息头部的键值对会被序列化并放置在消息体中的特定位置。在接收消息时,框架会从消息体中解析出消息头部的内容。

  3. headers:在这种模式下,所有的原生消息头都会被直接映射为Spring Messaging MessageHeaders的一部分。

3、绑定设置

前缀:spring.cloud.stream.binders,源码类为BinderProperties,用于定义和配置消息中间件绑定器(Binder),这样应用可以同时与多个消息代理系统(如Kafka、RabbitMQ等)进行交互。每个binder都有一个名称,通过这个名称可以在其他地方引用它,并且需要指定其类型以及相关的环境属性。

1.type:指定 Binder 的类型,是一个字符串,例如 kafka或者rabbit
2.environment:指定 Binder 的环境属性,例如 kafka的主机地址和端口号,我们可以在environment中定义各个binder的属性,如kafka,我们可以定义spring.kafka的属性,

示例:

spring:
  cloud:
    stream:
      binders:
        myKafkaBinder:  #Binder的名称,由我们定义
          type: kafka   # Binder的类型是 kafka
          environment:
            spring:
              kafka:
                bootstrap-servers: localhost:9092
                producer: 
                  retries: 3 
                  batch-size: 16384
                  buffer-memory: 33554432
                  acks: 1
                consumer:
                  enable-auto-commit: false
                  auto-offset-reset: earliest
                listener:
                  ack-mode: MANUAL_IMMEDIATE

该配置指定了一个名为 kafkaBinder1 的 Binder,其类型为 kafka,并使用了一些 Kafka 客户端属性来自定义 Kafka 生产者和消费者的行为。这样的方式适合我们有多个binder时进行配置,如果只有一个,那么直接使用spring.cloud.stream.kafka.binder这种方式进行配置即可,也可以直接使用spring.kafka进行配置,因为spring.cloud.stream.kafka.binder的配置会和spring.kafka的配置进行合并

public Map<String, Object> mergedConsumerConfiguration() {
   
		Map<String, Object> consumerConfiguration = new HashMap<>();
		consumerConfiguration.putAll(this.kafkaProperties.buildConsumerProperties());
		// Copy configured binder properties that apply to consumers
		for (Map.Entry<String, String> configurationEntry : this.configuration
				.entrySet()) {
   
			if (ConsumerConfig.configNames().contains(configurationEntry.getKey())) {
   
				consumerConfiguration.put(configurationEntry.getKey(),
						configurationEntry.getValue());
			}
		}
		consumerConfiguration.putAll(this.consumerProperties);
		filterStreamManagedConfiguration(consumerConfiguration);
		// Override Spring Boot bootstrap server setting if left to default with the value
		// configured in the binder
		return getConfigurationWithBootstrapServer(consumerConfiguration,
				ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
	}

以下是这些属性的含义:

spring.kafka.bootstrap-servers:指定 Kafka 的 Bootstrap 服务器地址。

spring.kafka.producer.retries:生产者发送失败后的重试次数。

spring.kafka.producer.batch-size:批量发送消息时的默认批次大小。

spring.kafka.producer.buffer-memory:生产者可用于缓冲等待发送到服务器的记录的总内存大小。

spring.kafka.producer.acks:指定生产者要求 leader 接收的确认数。
确认数表示在将消息视为成功发送之前,生产者需要等待的确认数量。这个属性可以设置为以下几个值:
0:生产者不会等待来自 broker 的任何确认,将立即认为消息发送成功。
1:生产者在收到来自 leader 分区的确认后,即可认为消息发送成功。
-1 或 all:生产者在收到来自 leader 和所有副本分区的确认后,才认为消息发送成功。
默认情况下,acks 的值是 1,即只要 leader 分区确认接收到消息,生产者就认为发送成功。这种配置提供了一定的容错性,因为只需要一个副本确认即可,但也可能导致消息丢失的风险。

spring.kafka.consumer.enable-auto-commit:是否启用自动提交偏移量。

spring.kafka.consumer.auto-offset-reset:当消费者没有初始偏移量或者当前偏移量不存在时,从何处开始消费。
该属性可以设置为以下几个值:
latest:当发现当前分区不存在或者当前偏移量无效时,将从最新的偏移量开始消费。也就是说,消费者会跳过已经提交的偏移量,从最新的消息开始消费。
earliest:当发现当前分区不存在或者当前偏移量无效时,将从最早的偏移量开始消费。也就是说,消费者会从最早可用的消息开始消费。
none:当发现当前分区不存在或者当前偏移量无效时,抛出异常。

spring.kafka.listener.ack-mode:当消费者从主题中读取消息时,应该何时确认消息的接收。MANUAL_IMMEDIATE为手动确认模式,消费者需要在消息处理逻辑完成后,调用 Acknowledgment.acknowledge() 方法来确认消息的接收

绑定器配置

因为我使用的是kafka,所以下面介绍Spring Cloud Stream 框架中与 Kafka 相关的配置属性,前缀为:spring.cloud.stream.kafka,这些配置会和spring.kafka配置进行合并

spring.cloud.stream.kafka.binder.brokers:指定 Kafka 服务的地址列表,多个地址使用逗号分隔。

spring.cloud.stream.kafka.binder.defaultBrokerPort:指定 Kafka 服务的默认端口号。

spring.cloud.stream.kafka.binder.autoCreateTopics:指定是否自动创建主题。如果设置为 true,则在使用不存在的主题时会自动创建主题;如果设置为 false,则必须手动创建主题后才能使用。

spring.cloud.stream.kafka.binder.zkNodes:指定 ZooKeeper 服务的地址列表,多个地址使用逗号分隔。在使用基于 ZooKeeper 的 Kafka 集群时需要配置该属性。

spring.cloud.stream.kafka.binder.consumerProperties:用于设置 Kafka 消费者属性的配置项。该属性可以被用来设置一些消费者相关的属性,例如消费者的 offset 起始值、心跳时间、session 超时时间等。
例如:

spring.cloud.stream.kafka.binder.consumerProperties.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.consumerProperties.session.timeout.ms=6000

auto.offset.reset=earliest 表示当消费者第一次订阅一个主题时,从最早的消息开始消费;而 session.timeout.ms=6000 表示消费者和 Kafka broker 之间的 session 超时时间为 6 秒。

spring.cloud.stream.kafka.binder.producerProperties:用于设置 Kafka 生产者属性的配置项。通过这个属性,您可以设置一些与生产者相关的属性,如acks、retries、compression.type 等。
例如:

spring.cloud.stream.kafka.binder.producerProperties.acks=all
spring.cloud.stream.kafka.binder.producerProperties.retries=3

spring.cloud.stream.kafka.binder.configuration.*:指定 Kafka 客户端的其他配置属性,可以根据实际业务需求进行配置。

相关推荐

  1. SpringCloudStream整合MQ(待完善)

    2024-01-21 04:42:02       56 阅读
  2. SpringCloud Stream配置详解

    2024-01-21 04:42:02       48 阅读
  3. tsconfig.json配置详解

    2024-01-21 04:42:02       53 阅读
  4. MyBatis配置文件详解

    2024-01-21 04:42:02       39 阅读
  5. gateway基本配置详解

    2024-01-21 04:42:02       29 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-21 04:42:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-21 04:42:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-21 04:42:02       82 阅读
  4. Python语言-面向对象

    2024-01-21 04:42:02       91 阅读

热门阅读

  1. Spring中@Async的使用技巧

    2024-01-21 04:42:02       66 阅读
  2. 洛谷 P8218 【深进1.例1】求区间和 c语言

    2024-01-21 04:42:02       41 阅读
  3. 2024 前端高频面试题之 浏览器原理 篇

    2024-01-21 04:42:02       59 阅读
  4. c++ STL

    2024-01-21 04:42:02       52 阅读
  5. C++从零开始的打怪升级之路(day16)

    2024-01-21 04:42:02       56 阅读
  6. SpringBoot-03

    2024-01-21 04:42:02       59 阅读
  7. C++中的new/delete

    2024-01-21 04:42:02       63 阅读
  8. Spring DI

    Spring DI

    2024-01-21 04:42:02      57 阅读
  9. 有了指令集架构, 到完成CPU成品还有多远距离

    2024-01-21 04:42:02       58 阅读