微服务实现全链路灰度发布

一、实现步骤

  1. 再请求 Header 中打上标签,例如再 Header 中添加 "gray-tag: true" ,其表示要进行灰度测试(访问灰度服务),而其他则访问正式服务。
  2. 在负载均衡器 Spring Cloud LoadBalancer 中,拿到 Header 仲的 "gray-tag" 进行判断,如果此标签不为空,并等于"true" 的话,表示要访问灰度发布的服务,否则只访问正式的服务。
  3. 在网关 Spring Cloud Gateway 中,将 Header 标签 "gray-tag:true" 继续往下一个调用服务中传递。
  4. 在后续的调用服务中,需要实现两个关键功能:
    ● 在负载均衡器 Spring Cloud LoadBalancer 中,判断回复发布标签,将请求分发到对应服务
    ● 将灰度发布标签继续传递给下一个调用的服务.如此反复传递

二、服务模块

2.1 注册为灰色服务实例

spring:
  application:
    name: user-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
        username: nacos
        password: nacos
        metadata:
          {"gray-tag":true} #标识当前为灰度节点
server:
  port: 0

2.2 设置负载均衡器

在服务启动类设置父子均衡器和Openfeign 服务 

@SpringBootApplication
@LoadBalancerClients(defaultConfiguration = GllobalLoadbanlancerConfig.class)
@EnableFeignClients
public class UserServiceGrayApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceGrayApplication.class, args);
    }

}

2.3 传递灰度标签

package com.example.userservicegray.config;

import com.example.globalconfigdemo.global.GlobalVarible;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.Enumeration;

@Component
public class FeignRequestInterceptor implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        // 从 RequestContextHolder 中获取 HttpServletRequest
        ServletRequestAttributes attributes = (ServletRequestAttributes)
                RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        if(request.getHeader(GlobalVarible.GRAY_TAG)!=null&&request.getHeader(GlobalVarible.GRAY_TAG)=="true"){
            requestTemplate.header(GlobalVarible.GRAY_TAG,"true");
        }
    }
//    @Override
//    public void apply(RequestTemplate requestTemplate) {
//        // 从 RequestContextHolder 中获取 HttpServletRequest
//        ServletRequestAttributes attributes = (ServletRequestAttributes)
//                RequestContextHolder.getRequestAttributes();
//        HttpServletRequest request = attributes.getRequest();
//        Enumeration<String> headerNames = request.getHeaderNames();
//        while (headerNames.hasMoreElements()){
//            String key = headerNames.nextElement();
//            String value = request.getHeader(key);
//            requestTemplate.header(key,value);
//        }
//    }
}

三、网关模块

网关传递灰度发布标识

package com.example.gatewayservice;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
public class GatewayFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request= exchange.getRequest();
        ServerHttpResponse response= exchange.getResponse();
        if(request.getQueryParams().getFirst(GlobalVarible.GRAY_TAG)!=null&&request.getQueryParams().getFirst(GlobalVarible.GRAY_TAG)=="true") {
            response.getHeaders().set(GlobalVarible.GRAY_TAG, "true");
        }
        System.out.println("======================================");
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

四、自定义负载均衡模块

4.1 自定义负载均衡器

package com.example.globalconfigdemo.global;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.loadbalancer.core.*;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Mono;


import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class GlobalLoadbancer implements ReactorServiceInstanceLoadBalancer {
    private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
    private AtomicInteger position;
    private String serviceId;
    ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    public GlobalLoadbancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this(serviceInstanceListSupplierProvider, serviceId, (new Random()).nextInt(1000));
    }

    public GlobalLoadbancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.position = new AtomicInteger(seedPosition);
    }

    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next().map((serviceInstances) -> {
            //此时调用时,将request作为参数传给调用方法,在得到服务实例时通过判断请求头中的标识来返回实例
            return this.processInstanceResponse(supplier, serviceInstances,request);
        });
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances,Request request) {
        //将request 传给调用方法
        Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances,request);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());
        }

        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,Request request) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + this.serviceId);
            }

            return new EmptyResponse();
        } else if (instances.size() == 1) {
            return new DefaultResponse((ServiceInstance)instances.get(0));
        } else {

            //得到 Request 对象,[通过方法传递参数得到此对象]
            //从 Request 对象的 Header 中得到灰度标签
            RequestDataContext requestDataContext= (RequestDataContext) request.getContext();
            HttpHeaders headers=requestDataContext.getClientRequest().getHeaders();
            if(headers.get(GlobalVarible.GRAY_TAG)!=null&&headers.get(GlobalVarible.GRAY_TAG).get(0).equals("true")){
                List<ServiceInstance> grayInstance=instances.stream().filter(s->s.getMetadata().get(GlobalVarible.GRAY_TAG)!=null&&s.getMetadata().get(GlobalVarible.GRAY_TAG).equals("true")).toList();
                //判断灰度列表不为空
                if(grayInstance.size()>0){
                    instances=grayInstance;
                }
            }else {
                instances=instances.stream().filter(s->s.getMetadata().get(GlobalVarible.GRAY_TAG)==null || !s.getMetadata().get(GlobalVarible.GRAY_TAG).equals("true")).toList();
            }
            int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
            ServiceInstance instance = instances.get(pos % instances.size());
            return new DefaultResponse(instance);
        }
    }
}

4.2 封装自定义负载均衡器

package com.example.globalconfigdemo.global.config;


import com.example.globalconfigdemo.global.GlobalLoadbancer;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;

public class GllobalLoadbanlancerConfig {
    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty("loadbalancer.client.name");
        return new GlobalLoadbancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    }

}

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-22 05:42:03       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-22 05:42:03       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-22 05:42:03       45 阅读
  4. Python语言-面向对象

    2024-07-22 05:42:03       55 阅读

热门阅读

  1. android audio 相机按键音:(一)资源加载与替换

    2024-07-22 05:42:03       16 阅读
  2. 使用 Jenkins 实现持续集成和持续部署(CI/CD)

    2024-07-22 05:42:03       11 阅读
  3. TiDB热点问题

    2024-07-22 05:42:03       17 阅读
  4. setup中如何获取组件实例

    2024-07-22 05:42:03       17 阅读
  5. 编程中的智慧五:工厂设计模式

    2024-07-22 05:42:03       19 阅读