七、Nacos源码系列:Nacos服务发现

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {

    // 创建DiscoveryClient bean对象
	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
			matchIfMissing = true)
	public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
	}

}


@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
        // 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,
        // 如namespace、username、password、serverAddr等属性
		return new NacosDiscoveryProperties();
	}

    // 创建NacosServiceDiscovery bean对象
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}

}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {

	private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);

	/**
	 * Nacos Discovery Client Description.
	 */
	public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";

	private NacosServiceDiscovery serviceDiscovery;

	public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return DESCRIPTION;
	}

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		try {
			return serviceDiscovery.getInstances(serviceId);
		}
		catch (Exception e) {
			throw new RuntimeException(
					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
		}
	}

	@Override
	public List<String> getServices() {
		try {
			return serviceDiscovery.getServices();
		}
		catch (Exception e) {
			log.error("get service name from nacos server fail,", e);
			return Collections.emptyList();
		}
	}

}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {

    // 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上
	private NacosDiscoveryProperties discoveryProperties;

    // nacos服务管理器对象
	private NacosServiceManager nacosServiceManager;

	public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		this.discoveryProperties = discoveryProperties;
		this.nacosServiceManager = nacosServiceManager;
	}

	/**
	 * 返回指定group和servic的所有实例
	 */
	public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
		// 配置文件中配置的group组名
        String group = discoveryProperties.getGroup();
		// namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NacosNamingService#selectInstances()方法
        List<Instance> instances = namingService().selectInstances(serviceId, group,
				true);
        // 将Instance包装成NacosServiceInstance对象返回
		return hostToServiceInstanceList(instances, serviceId);
	}

	/**
	 * 返回指定group的所有服务名称
	 */
	public List<String> getServices() throws NacosException {
        // 配置文件中配置的group组名
		String group = discoveryProperties.getGroup();
        // namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NamingGrpcClientProxy#getServiceList()方法
		ListView<String> services = namingService().getServicesOfServer(1,
				Integer.MAX_VALUE, group);
    	// 返回所有服务名称
		return services.getData();
	}

	public static List<ServiceInstance> hostToServiceInstanceList(
			List<Instance> instances, String serviceId) {
		List<ServiceInstance> result = new ArrayList<>(instances.size());
		for (Instance instance : instances) {
			ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);
			if (serviceInstance != null) {
				result.add(serviceInstance);
			}
		}
		return result;
	}

	public static ServiceInstance hostToServiceInstance(Instance instance,
			String serviceId) {
		if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {
			return null;
		}
		NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();
		nacosServiceInstance.setHost(instance.getIp());
		nacosServiceInstance.setPort(instance.getPort());
		nacosServiceInstance.setServiceId(serviceId);

		Map<String, String> metadata = new HashMap<>();
		metadata.put("nacos.instanceId", instance.getInstanceId());
		metadata.put("nacos.weight", instance.getWeight() + "");
		metadata.put("nacos.healthy", instance.isHealthy() + "");
		metadata.put("nacos.cluster", instance.getClusterName() + "");
		if (instance.getMetadata() != null) {
			metadata.putAll(instance.getMetadata());
		}
		metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));
		nacosServiceInstance.setMetadata(metadata);

		if (metadata.containsKey("secure")) {
			boolean secure = Boolean.parseBoolean(metadata.get("secure"));
			nacosServiceInstance.setSecure(secure);
		}
		return nacosServiceInstance;
	}

	private NamingService namingService() {
		return nacosServiceManager.getNamingService();
	}

}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
//getServices()调用栈大体如下:
 namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);
    NacosNamingService#getServicesOfServer
        clientProxy.getServiceList(pageNo, pageSize, groupName, selector)
            NamingClientProxyDelegate#getServiceList
                grpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);

// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
    // 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名
    ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
    if (selector != null) {
        if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
            request.setSelector(JacksonUtils.toJson(selector));
        }
    }
    // 发送服务列表请求给Nacos服务端,接下来由服务端处理
    ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
    // 组装返回值出去
    ListView<String> result = new ListView<>();
    result.setCount(response.getCount());
    result.setData(response.getServiceNames());
    return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
    // ServiceManager.getInstance()通过单例返回一个ServiceManager对象
   
    /**
     * 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。
     * ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)
     * key: namespaceId
     * value: Set<Service>
     * 注册实例的时候,就往这个map写入了数据
     *
     * ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:
     * namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))
     */
    Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());

    // 构建响应结果
    ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
    if (!serviceSet.isEmpty()) {
        // 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceA
        Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
        // 按分页裁剪serviceNameSet
        List<String> serviceNameList = ServiceUtil
                .pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
        result.setCount(serviceNameSet.size());
        result.setServiceNames(serviceNameList);
    }
    return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);
	// NamingService#selectInstances(serviceName, groupName, healthy, true)
		// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    // 集群名称,使用逗号分隔
    String clusterString = StringUtils.join(clusters, ",");

    // 是否订阅,默认是订阅的
    if (subscribe) {
        /**
         * 1.从缓存中获取ServiceInfo
         * ConcurrentMap<String, ServiceInfo> serviceInfoMap
         * key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString
         * value: ServiceInfo
         */

        // 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUP
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        // 2.缓存为空,执行订阅服务
        if (null == serviceInfo) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 4.筛选满足条件的实例
    return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    // 组装服务名(带组名):groupName@@serviceName
    // 例如:DEFAULT_GROUP@@discovery-provider
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果指定了集群,那么key还会加上"@@clusters"
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // ConcurrentMap<String, ServiceInfo> serviceInfoMap
    // 从缓存中获取服务信息
    return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    // 服务名称(带组名)  格式:groupName@@serviceName
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果集群名称非空,key还需要拼接上集群名称
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 调度更新,往线程池中提交一个UpdateTask任务
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 获取缓存中的服务信息
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        // 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    if (!asyncQuerySubscribeService) {
        return;
    }
    // 组装key   格式:groupName@@serviceName@@clusters
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    // Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()
    // futureMap用于保存UpdateTask线程池任务的执行结果
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        // double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务
        if (futureMap.get(serviceKey) != null) {
            return;
        }

        // 往线程池中添加一个更新任务
        // UpdateTask实现了Runnable接口,需要关注其run()方法
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    // 延迟1s执行UpdateTask
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {
    
    long lastRefTime = Long.MAX_VALUE;
    
    private boolean isCancel;
    
    private final String serviceName;
    
    private final String groupName;
    
    private final String clusters;
    
    private final String groupedServiceName;
    
    private final String serviceKey;
    
    /**
     * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
     */
    private int failCount = 0;
    
    public UpdateTask(String serviceName, String groupName, String clusters) {
        this.serviceName = serviceName;
        this.groupName = groupName;
        this.clusters = clusters;
        this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
    }
    
    @Override
    public void run() {
        long delayTime = DEFAULT_DELAY;
        
        try {
            if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                    serviceKey)) {
                NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
                isCancel = true;
                return;
            }
            
            ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
            if (serviceObj == null) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
                lastRefTime = serviceObj.getLastRefTime();
                return;
            }
            
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
            }
            lastRefTime = serviceObj.getLastRefTime();
            if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                // 记录失败次数
                incFailCount();
                return;
            }
            // TODO multiple time can be configured.
            delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
            // 重置失败次数
            resetFailCount();
        } catch (NacosException e) {
            handleNacosException(e);
        } catch (Throwable e) {
            handleUnknownException(e);
        } finally {
            if (!isCancel) {
                // 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                        TimeUnit.MILLISECONDS);
            }
        }
    }
    
    private void handleNacosException(NacosException e) {
        incFailCount();
        int errorCode = e.getErrCode();
        if (NacosException.SERVER_ERROR == errorCode) {
            handleUnknownException(e);
        }
        NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());
    }
    
    private void handleUnknownException(Throwable throwable) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);
    }
    
    private void incFailCount() {
        int limit = 6;
        if (failCount == limit) {
            return;
        }
        failCount++;
    }
    
    private void resetFailCount() {
        failCount = 0;
    }
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
    // 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,
    // 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe

    // 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    // 使用grpc发送服务订阅请求
    return doSubscribe(serviceName, groupName, clusters);
}

public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
    SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
    // private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    synchronized (subscribes) {
        subscribes.put(key, redoData);
    }
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    // 构建一个SubscribeServiceRequest客户端订阅请求
    // 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handle
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
            true);
    // grpc请求Nacos服务端进行订阅
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    // 标记SubscriberRedoData重做数据为已订阅
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String serviceName = request.getServiceName();
    String groupName = request.getGroupName();
    String app = request.getHeader("app", "unknown");
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 构建一个Service服务,指定为临时实例
    Service service = Service.newService(namespaceId, groupName, serviceName, true);
    // 构建Subscriber订阅者对象
    Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
            namespaceId, groupedServiceName, 0, request.getClusters());
    // serviceStorage.getData(service): 从缓存中获取serviceInfo
    // metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata
    // ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例
    ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
            metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
            true, subscriber.getIp());
    if (request.isSubscribe()) {
        // 订阅服务
        clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    } else {
        // 取消订阅服务
        clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    }
    return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 添加到订阅者列表中,实际上就是保存在map中
    // 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 发布客户端订阅事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    // 获取老的服务
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    // 重新存入客户端缓存中
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    // 对比下服务信息是否发生变更
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                JacksonUtils.toJson(serviceInfo.getHosts()));
        // 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifier
        NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        // 同步serviceInfo数据到本地文件
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    // 构建服务查询请求
    // Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handle
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    // 通过grpc请求Nacos服务端处理
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String groupName = request.getGroupName();
    String serviceName = request.getServiceName();
    Service service = Service.newService(namespaceId, groupName, serviceName);
    String cluster = null == request.getCluster() ? "" : request.getCluster();
    boolean healthyOnly = request.isHealthyOnly();
    // 从缓存中获取serviceInfo
    ServiceInfo result = serviceStorage.getData(service);
    // 从内存(map)获取ServiceMetadata
    ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
    // 获取有保护机制的健康实例
    result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
            meta.getClientIp());
    return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<>();
    }

    // 遍历所有实例,直接移除掉不满足条件的实例
    Iterator<Instance> iterator = list.iterator();
    while (iterator.hasNext()) {
        Instance instance = iterator.next();
        // 筛选出健康、启用、权重大于0的实例
        if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
            iterator.remove();
        }
    }
    
    return list;
}

3.5、总结图

 

相关推荐

  1. 使用Nacos实现服务发现

    2024-02-08 15:34:01       35 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-08 15:34:01       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-08 15:34:01       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-08 15:34:01       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-08 15:34:01       18 阅读

热门阅读

  1. 线程之间如何通信?

    2024-02-08 15:34:01       33 阅读
  2. NMEA GPS

    2024-02-08 15:34:01       24 阅读
  3. 系统架构22 - 软件架构设计(1)

    2024-02-08 15:34:01       32 阅读
  4. 【C/C++ 16】C++11线程库

    2024-02-08 15:34:01       31 阅读
  5. 前端bug手册

    2024-02-08 15:34:01       32 阅读
  6. react中的diff算法

    2024-02-08 15:34:01       32 阅读
  7. 线程和进程的区别及基础线程创建

    2024-02-08 15:34:01       32 阅读
  8. pandas dataframe写入excel的多个sheet页面

    2024-02-08 15:34:01       34 阅读
  9. Debian系统中挂载一个数据盘

    2024-02-08 15:34:01       32 阅读
  10. 有道论文翻译接口,python版和lua版

    2024-02-08 15:34:01       37 阅读
  11. [Android] Client->NuPlayer

    2024-02-08 15:34:01       31 阅读
  12. work day7

    2024-02-08 15:34:01       31 阅读
  13. 【mybatis自动治愈插件】

    2024-02-08 15:34:01       31 阅读