springboot3.0自定义ReactorLoadBalancer

背景

目前我们很多都使用spring cloud系框架做为项目开发的基础微服务架构框架。在使用spring cloud系列的框架时,我们经常会用到feign作为服务间调用的框架,为了应对一些定制化需求,我们有时会对这个框架做一些定制化。

Feign.Client

类路径feign.Client。这是一个接口,只包含了一个方法,方法名为execute,请求参数两个,返回参数一个。具体如下:

  /**
 * Executes a request against its {@link Request#url() url} and returns a response.
 *  * @param request safe to replay.
 * @param options options to apply to this request.
 * @return connected response, {@link Response.Body} is absent or unread.
 * @throws IOException on a network error connecting to {@link Request#url()}.
   */
  Response execute(Request request, Options options) throws IOException;

如果我们需要定义一个自己的client,例如不是用http,使用其他协议;或者需要实现重试等情况。但是一般情况我们不自定义这个类,而是使用线程的spring的实现类,或者使用其他框架的类。

常用的Feign.Client

  • org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient:比较常用的类,loadbalancer在不配置重试时,使用的是该类。
  • org.springframework.cloud.openfeign.loadbalancer.RetryableFeignBlockingLoadBalancerClient:配置了重试使用的类,使用该类的前置条件是配置重试规则和引入了RetryTemplate类

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
	@ConditionalOnBean(LoadBalancedRetryFactory.class)
	@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true",
			matchIfMissing = true)
	public Client feignRetryClient(LoadBalancerClient loadBalancerClient,
			LoadBalancedRetryFactory loadBalancedRetryFactory, LoadBalancerClientFactory loadBalancerClientFactory,
			List<LoadBalancerFeignRequestTransformer> transformers) {
   
		return new RetryableFeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient,
				loadBalancedRetryFactory, loadBalancerClientFactory, transformers);
	}

引入了RetryTemplate类

 		<!-- https://mvnrepository.com/artifact/org.springframework.retry/spring-retry -->
		<dependency>
		    <groupId>org.springframework.retry</groupId>
		    <artifactId>spring-retry</artifactId>
		</dependency>

自定义请求时选择哪个serviceInstance

serviceInstance选择实现类BlockingLoadBalancerClient

以上的FeignBlockingLoadBalancerClient或者RetryableFeignBlockingLoadBalancerClient的execute方法都会调用一个retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest);方法。目前来说loadBalancerClient在只使用spring-cloud-starter-loadbalancer时只有一个实现类,就是BlockingLoadBalancerClient
BlockingLoadBalancerClient代码:

/**
 * The default {@link LoadBalancerClient} implementation.
 *
 * @author Olga Maciaszek-Sharma
 * @since 2.2.0
 */
@SuppressWarnings({
    "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {
   

	private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

	public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
   
		this.loadBalancerClientFactory = loadBalancerClientFactory;
	}

	/*省略其他代码*/
	@Override
	public ServiceInstance choose(String serviceId) {
   
		return choose(serviceId, REQUEST);
	}

	@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
   
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
   
			return null;
		}
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
   
			return null;
		}
		return loadBalancerResponse.getServer();
	}

	/*省略其他代码*/

}

BlockingLoadBalancerClient构造函数

我们看到构造方法是需要传一个ReactiveLoadBalancer.Factory,这个很重要,可以具体看一下这个接口的实现类LoadBalancerClientFactory,这是一个继承了NamedContextFactory的类。NamedContextFactory出现在很多框架中,一般是用来做子容器使用。
看看NamedContextFactory的代码:

/*
 * Copyright 2012-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.context.named;

/*省略其他代码*/
/**
 * Creates a set of child contexts that allows a set of Specifications to define the beans
 * in each child context.
 *
 * Ported from spring-cloud-netflix FeignClientFactory and SpringClientFactory
 *
 * @param <C> specification
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Tommy Karlsson
 * @author Olga Maciaszek-Sharma
 */
public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>
		implements DisposableBean, ApplicationContextAware {
   

	private final Map<String, ApplicationContextInitializer<GenericApplicationContext>> applicationContextInitializers;

	private final String propertySourceName;

	private final String propertyName;

	private final Map<String, GenericApplicationContext> contexts = new ConcurrentHashMap<>();

	private Map<String, C> configurations = new ConcurrentHashMap<>();

	private ApplicationContext parent;

	private Class<?> defaultConfigType;

	public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName, String propertyName) {
   
		this(defaultConfigType, propertySourceName, propertyName, new HashMap<>());
	}

	public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName, String propertyName,
			Map<String, ApplicationContextInitializer<GenericApplicationContext>> applicationContextInitializers) {
   
		this.defaultConfigType = defaultConfigType;
		this.propertySourceName = propertySourceName;
		this.propertyName = propertyName;
		this.applicationContextInitializers = applicationContextInitializers;
	}
/*省略其他代码*/
	protected GenericApplicationContext getContext(String name) {
   
		if (!this.contexts.containsKey(name)) {
   
			synchronized (this.contexts) {
   
				if (!this.contexts.containsKey(name)) {
   
					this.contexts.put(name, createContext(name));
				}
			}
		}
		return this.contexts.get(name);
	}

	public GenericApplicationContext createContext(String name) {
   
		GenericApplicationContext context = buildContext(name);
		// there's an AOT initializer for this context
		if (applicationContextInitializers.get(name) != null) {
   
			applicationContextInitializers.get(name).initialize(context);
			context.refresh();
			return context;
		}
		registerBeans(name, context);
		context.refresh();
		return context;
	}

	public <T> T getInstance(String name, Class<T> type) {
   
		GenericApplicationContext context = getContext(name);
		try {
   
			return context.getBean(type);
		}
		catch (NoSuchBeanDefinitionException e) {
   
			// ignore
		}
		return null;
	}

	public <T> ObjectProvider<T> getLazyProvider(String name, Class<T> type) {
   
		return new ClientFactoryObjectProvider<>(this, name, type);
	}

	public <T> ObjectProvider<T> getProvider(String name, Class<T> type) {
   
		GenericApplicationContext context = getContext(name);
		return context.getBeanProvider(type);
	}

	public <T> T getInstance(String name, Class<?> clazz, Class<?>... generics) {
   
		ResolvableType type = ResolvableType.forClassWithGenerics(clazz, generics);
		return getInstance(name, type);
	}

	@SuppressWarnings("unchecked")
	public <T> T getInstance(String name, ResolvableType type) {
   
		GenericApplicationContext context = getContext(name);
		String[] beanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type);
		if (beanNames.length > 0) {
   
			for (String beanName : beanNames) {
   
				if (context.isTypeMatch(beanName, type)) {
   
					return (T) context.getBean(beanName);
				}
			}
		}
		return null;
	}

	public <T> Map<String, T> getInstances(String name, Class<T> type) {
   
		GenericApplicationContext context = getContext(name);

		return BeanFactoryUtils.beansOfTypeIncludingAncestors(context, type);
	}

/*省略其他代码*/

}

我们看到在类属性里面是包含了一个contexts 的map,这个map的key是String类型的,value是GenericApplicationContext,这个GenericApplicationContext就是我们常说的spring的容器。

private final Map<String, GenericApplicationContext> contexts = new ConcurrentHashMap<>();

也就是说NamedContextFactory是针对不同的场景创建不同的子容器。那么对于feign来说,不同的feign接口(带@FeignClien的接口)就是不同的场景。

BlockingLoadBalancerClient的choose方法

先看代码

@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
   
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
   
			return null;
		}
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
   
			return null;
		}
		return loadBalancerResponse.getServer();
	}

在执行choose方法内部,先找到ReactiveLoadBalancer,然后再调用ReactiveLoadBalancer的choose方法。到这里我们可以看到常用的ReactiveLoadBalancer有RandomLoadBalancer和RoundRobinLoadBalancer,如果使用了nacos,还有NacosLoadBalancer。RandomLoadBalancer是随机取一个serviceInstance,RoundRobinLoadBalancer是轮询。这两个中在spring-loadbalancer只使用了RoundRobinLoadBalancer。

RoundRobinLoadBalancer实现

看看RoundRobinLoadBalancer的代码:

public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
   

	private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);

	final AtomicInteger position;

	final String serviceId;

	ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

	/**
	 * @param serviceInstanceListSupplierProvider a provider of
	 * {@link ServiceInstanceListSupplier} that will be used to get available instances
	 * @param serviceId id of the service for which to choose an instance
	 */
	public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
			String serviceId) {
   
		this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
	}

	/**
	 * @param serviceInstanceListSupplierProvider a provider of
	 * {@link ServiceInstanceListSupplier} that will be used to get available instances
	 * @param serviceId id of the service for which to choose an instance
	 * @param seedPosition Round Robin element position marker
	 */
	public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
			String serviceId, int seedPosition) {
   
		this.serviceId = serviceId;
		this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
		this.position = new AtomicInteger(seedPosition);
	}

	@SuppressWarnings("rawtypes")
	@Override
	// see original
	// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
	// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
	public Mono<Response<ServiceInstance>> choose(Request request) {
   
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		return supplier.get(request).next()
				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
	}

	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
			List<ServiceInstance> serviceInstances) {
   
		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
   
			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
		}
		return serviceInstanceResponse;
	}

	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
   
		if (instances.isEmpty()) {
   
			if (log.isWarnEnabled()) {
   
				log.warn("No servers available for service: " + serviceId);
			}
			return new EmptyResponse();
		}

		// Do not move position when there is only 1 instance, especially some suppliers
		// have already filtered instances
		if (instances.size() == 1) {
   
			return new DefaultResponse(instances.get(0));
		}

		// Ignore the sign bit, this allows pos to loop sequentially from 0 to
		// Integer.MAX_VALUE
		int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

		ServiceInstance instance = instances.get(pos % instances.size());

		return new DefaultResponse(instance);
	}

}

我们看到choose代码其实没两行,先是从众多ServiceInstanceListSupplier中获取一个ServiceInstanceListSupplier,然后从ServiceInstanceListSupplier获取符合这个serviceId的serviceInstance列表。最后从这个列表中按照轮询的规则获取其中一个serviceInstance。
先看下ServiceInstanceListSupplier有哪些。
ServiceInstanceListSupplier
其中我们最后获取出来的是RetryAwareServiceInstanceListSupplier,但是RetryAwareServiceInstanceListSupplier其实走的是一个委托模式,RetryAwareServiceInstanceListSupplier的底层是CachingServiceInstanceListSupplier,CachingServiceInstanceListSupplier也是委托模式,CachingServiceInstanceListSupplier的底层是DiscoveryClientServiceInstanceListSupplier。DiscoveryClientServiceInstanceListSupplier也是委托,最后获取serviceInstance的还是DiscoveryClient。
CachingServiceInstanceListSupplier主要是做一个缓存,避免每次都去重新获取。
最后ServiceInstanceListSupplier会获取出来一个serviceInstance的列表。

自定义LoadBalancer

先自定义类

我们可以参考RoundRobinLoadBalancer,一样先获取出来serviceInstance的list,然后再从serviceInstance的list中选一个。

注入到对应的applicationContext

自定义完成后,我们需要将我们的LoadBalancer注入到applicationContext中。参考RoundRobinLoadBalancer的注入方式,RoundRobinLoadBalancer需要一个ServiceInstanceListSupplierProvider作为构造函数,ServiceInstanceListSupplierProvider来自于loadBalancerClientFactory,loadBalancerClientFactory就是我们上面说到的针对不同的@FeignClient创建的不同的子容器。

@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
   
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
		return new RoundRobinLoadBalancer(
				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}

也就是说我们现在自定义的LoadBalancer需要加入到LoadBalancerClientFactory的子容器中,不能直接写一个配置类然后加个@bean,需要在特定的Configuration中。这里就借鉴RoundRobinLoadBalancer所在的Configuration(org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration)。所以重写LoadBalancerClientConfiguration,新增我们新加的LoadBalancer类的创建。

相关推荐

  1. SpringBoot--定义starter

    2023-12-26 07:36:04       28 阅读
  2. springboot定义starter

    2023-12-26 07:36:04       30 阅读
  3. SpringBoot定义Starter

    2023-12-26 07:36:04       12 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-26 07:36:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-26 07:36:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-26 07:36:04       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-26 07:36:04       20 阅读

热门阅读

  1. 基于SpringBoot和微信小程序的驾校预约平台

    2023-12-26 07:36:04       39 阅读
  2. 剑指offer30天打卡活动(day21 - day26)

    2023-12-26 07:36:04       33 阅读
  3. jQuery的事件-动画-AJAX和插件

    2023-12-26 07:36:04       29 阅读
  4. 【网络安全】SQL注入总结

    2023-12-26 07:36:04       30 阅读
  5. docker run --help帮助文档

    2023-12-26 07:36:04       24 阅读
  6. MongoDB创建和查询视图(一)

    2023-12-26 07:36:04       33 阅读
  7. Mysql5.7服务器选项、系统变量和状态变量参考

    2023-12-26 07:36:04       39 阅读
  8. hive高级查询(2)

    2023-12-26 07:36:04       30 阅读
  9. Flink 日志总结

    2023-12-26 07:36:04       38 阅读
  10. 【WinForm.NET开发】数据绑定

    2023-12-26 07:36:04       31 阅读
  11. Kotlin 导包规则

    2023-12-26 07:36:04       37 阅读