Nacos 入门篇---内存注册表:高并发下如何保证注册表读写并发冲突 ?(五)

一、引言

  本章来讲解Nacos注册表是如何进行写入数据的~

二、目录 

目录

一、引言

二、目录 

三、服务注册源码内容回顾

客户端源码回顾:

服务端源码回顾:

四、Nacos 注册表结构详解

五、写时复制概念

六、Nacos服务注册写入注册表源码解析

总结


三、服务注册源码内容回顾
客户端源码回顾:

  在 Spring Boot 启动时,会扫描spring-cloud-starter-alibaba-nacos-discovery依赖下的 spring.factories 文件,从而创建里面的相关配置类。 在 spring.factories 文件中,有一个 NacosServiceRegistryAutoConfiguration类。这个配置类定义了三个bean对象。

  • NacosServiceRegistry
  • NacosRegistration
  • NacosAutoServiceRegistration

  在NacosAutoServiceRegistration的父类中,自定义了Spring监听器。当Spring 容器启动时,就会发布监听 WebServerInitializedEvent 事件,从而执行 NacosServiceRegistry 类中的 register 注册方法。在注册方法中,会通过HTTP方式调用Nacos服务端的实例注册接口,完成服务注册。

   在发起服务实例注册之前,客户端会先去通过 BeatTask 任务,每5s向Nacos服务端发送一次健康心跳检查。

 
服务端源码回顾:

放任务:

    从客户端发起的请求地址我们可以得知,服务端服务注册接口: /nacos/v1/ns/instance。

首先我们看出Nacos服务端也是一个Springboot项目,通过架构图,我们能找到 注册中心模块的代码在 naming 当中,最后确定了 /nacos/v1/ns/instance 路径是在InstanceController 类中的 register 方法。

   在 register 方法当中,Instance 对象包装成 Datum 对象,放入到 DataStore 类中的dataMap里,最后包装成Pair对象,放入到阻塞队列当中。

取任务:

    在Nacos后台会有一个 Notifier 异步任务,在 DistroConsistencyServiceImpl 类中会有一个被 @PostConstruct 修饰的init方法,该方法会把 Notifier 提交到单线程的线程池当中。在 Notifier类中的run方法,会不断从 tasks 队列中获取任务,紧接着调用 handle 方法。在 handle方法当中,会通过key把 Instance 从 DataStore类中的dataMap 获取出来,然后调用 listener.onChange 方法把数据写入到注册表当中。

最后讲到 listener.onChange 方法,后面就没再讲了,在讲之前先要说明两个重点:

  • Nacos 内存注册表结构是什么样子的?
  • 写时复制概念是什么?

讲完这两个知识点,我们再来分析本章源码就会很容易很多。

四、Nacos 注册表结构详解

Nacos 注册表是什么?注册表就是用来存放我们微服务实例注册信息的地方

在ServiceManager类中有一个serviceMap属性,它就是对应我们 Nacos 的内存注册表。

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

我怎么知道这个就是注册表的呢?

这个是我在看查询实例列表源码的时候,最后返回的实例列表就是从这个 serviceMap 里面取的。客户端在调用其他微服务的时候,会先调用 Nacos 查询实例列表接口,查询当前可用服务,从而发起微服务调用,这一块后面章节再详细讲解。

本小节我们就先来重点分析注册表结构

    从源码来看,注册表结构是由两层Map结构组合的,那我们先来讲解下最外层的Map结构。

    最外层的 key 对应的就是命名空间,如果不创建默认就是 public。命名空间下还包含了不同的分组,分别是 DEFAULT_GROUP、DEFAULT_GROUP_2。

然后再下一层的key对应的就是 分组名 +服务名,对应的value就是Service对象。

大概的对应关系如下图:

最外面两层分析完了,我们再来看看 Service 里面有什么。在Service当中有个 clusterMap属性(如下图),这个属性 key 是集群名称,value 是用来存放实例对象的。这个就是来支持不同环境集群实例存放的地方。

private Map<String, Cluster> clusterMap = new HashMap<>();

那么什么情况下需要使用到区分集群实例?

当项目访问量足够大的时候,为了响应速度更快,我们就要把服务部署在不同的地区服务器上。比如:北京有北京的集群实例、上海有上海的集群实例

我们可以在客户端配置集群的名字:

spring:
  cloud:
    nacos:
      discovery:
        # 注册地址
        server-addr: http://127.0.0.1:8848
        # 配置分组名字
        group: DEFAULT_GROUP_3
        # 配置集群名字
        cluster-name: BJ

我们可以在Nacos后台上看到该服务在不同集群下对应的实例。

那么这个实例列表信息是怎么存储的呢 ?

在 Cluster 类当中有两个Set 类型的属性进行存储。

在这两个集合当中,储存的就是 Instance 实例,Instance 实例包含了ip、port等信息。

/**
 * 持久化实例列表
 */
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();

/**
 * 临时实例列表
 */
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();

ok,看到这里Nacos结构我们也大致了解了,我们看下图的总结:

其实我们能看到最核心的 Instance 是放在 Cluster 里面的。

五、写时复制概念

为什么要讲写时复制 ?

这是因为Nacos在注册实例写入注册表的时候用的就是写时复制,可以很好地避免并发冲突

那什么是写时复制 ?

写时复制:Copy On Write 在数据写入到某个存储位置时,首先将原有内容拷贝出来,写到另一处地方,再将原来的引用地址修改成新对象地址。

那为什么会用到写时复制 ?

这里因为serviceMap 注册表,在 Cluster 对象中,最后使用 HashSet 来存储 Instance 对象的,它其实是一个共享的数据。所以在高并发下的场景,就可能会发生读写冲突。

为了让不懂读写冲突的小伙能明白,下面我们写个代码演示一下:

/**
 * @Author WangYan
 * @Date 2024/5/8 17:13
 * @Version 1.0
 */
public class Test02 {
    public static void main(String[] args) {
        // 假设这个是存放实例信息
        Set<Object> objectSet = new HashSet<>();

        // 模拟异步任务,写入数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 先睡眠一下,不然还没开始读,就已经写完了
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 写入10w条数据
                for (int i = 0; i < 100000; i++) {
                    objectSet.add(i);
                }
            }
        }).start();


        // 死循环一直读取数据,模拟高并发场景
        for (; ; ) {
            for (Object o : objectSet) {
                System.out.println(o);
            }
        }
    }
}

控制台输出:

在多条线程对一个属性进行同时读写操作的时候,就会抛出 java.util.ConcurrentModificationException 异常。

这个时候我们就可以用到 写时复制,把原先的数据先备份一份,然后对备份的数据进行修改。这个时候是不会对原来有影响的,等数据操作完成后,再把原来对象的引用地址指向复制对象的引用地址,就完成了替换效果。

六、Nacos服务注册写入注册表源码解析

主线任务:在注册异步任务中,Nacos 是怎么把新的实例信息,写入到注册表中的 ?

还是回到我们上个章节最后的代码,这个 dataStore.get(datumKey).value 就是从 dataStore 里的Map中,把 Instances 实例列表获取出来。

listener.onChange(datumKey, dataStore.get(datumKey).value);

我们接着往下看 listener.onChange() 方法,可以看到这里又是多个实现类。前面章节说过,要么看对象注入调用方式确定实现类,或者直接 Debug,看一下步走到

哪个实现了类了。这里我们直接Debug,直接看 Service 当中的 onChange方法。

我们来看下 onChange 的代码实现,先看入参

  • Key:这个 Key 的创建我们之前有分析过,KeyBuilder.buildInstanceListKey 代码创建出来的。
  • Instances:它里面有个 InstanceList 属性,会存放多个 Instance 对象。
@Override
public void onChange(String key, Instances value) throws Exception {

    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

    // 对每一个Instance当中的权重进行判断赋值,大于 10000.0D 的给 10000.0D,大于 0.0D 并且小于 0.01D 的给 0.01D
    for (Instance instance : value.getInstanceList()) {

        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }

        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }

        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }

    // 主线任务:通过写时复制把新的实例信息写入注册表当中
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

    recalculateChecksum();
}

这里我们来回顾下分支代码, Instances 是怎么被创建出来的。这个是在服务端接受注册的时候,在 addInstance 方法当中创建的。

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    
    // 创建Key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    Service service = getService(namespaceId, serviceName);

    // 锁住一个service
    synchronized (service) {
        // 这里提前说一下,ips 上层方法传过来的,是本次实例注册对应的Instance,也就是已开始从Request里面获取的参数信息。
        // 最后会放在instanList里面,为什么这里是List,说明它不仅仅只有一个,还会包含之前已经注册的Instance,放在了一个List里面
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        // 创建一个 Instances 对象,并且把 instanceList 属性set进去
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);
    }
}

  我们接着往 addIpAddresses 方法里面看,通过 updateIpAddresses 方法进行重载调用。这个 updateIpAddresses 方法我就不讲那么细了,只要知道,第一次创建最终 instanceMap 只会返回新增的 instance 实例,后面进来不仅返回新增的 instance 实例还返回之前的 Instance 实例信息

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    // 调用 updateIpAddresses 方法,这里 action 传的是 add
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {

    // 这里 datum 是从 dataStore 当中通过key获取的
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();

    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }

    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        instanceMap = new HashMap<>(ips.length);
    }

    for (Instance instance : ips) {
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }

        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }

            // 重点
            // instanceMap 会有两种情况
            // 第一种情况:第一次创建 instanceMap 对应一个空 Map,然后把新增加的 isntance 实例放进去
            // 第二种情况:不是第一次创建,instanceMap 会包含之前所创建的 Instance 对象
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }

    // 最后 instanceMap 里面肯定会包含 新注册的 Instance 实例
    // 并且如果不是第一次注册,里面会包含了 之前 Instance 实例信息
    return new ArrayList<>(instanceMap.values());
}

接着我们重点分析 onChange 方法中最后调用了 updateIPs方法,在这个方法中会把新注册 Instance 写入到注册表当中

// 这里 instances 里面就包含了新实例对象
// ephemeral 为 ture,临时实例
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// clusterMap 对应集群的Map
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 把集群名字都放入到ipMap里面,value是一个空的ArrayList
for (String clusterName : clusterMap.keySet()) {
    ipMap.put(clusterName, new ArrayList<>());
}

// 遍历全部的Instance,这里之前讲过,这个List<Instance> 包含了之前已经注册过的实例,和新注册的实例对象
// 这里的主要作用就是把相同集群下的 instance 进行分类
for (Instance instance : instances) {
    try {
        if (instance == null) {
            Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
            continue;
        }

        // 判断客户端传过来的是 Instance 中,是否有设置 ClusterName
        if (StringUtils.isEmpty(instance.getClusterName())) {
            // 如果没有,就给ClusterName赋值为 DEFAULT
            instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        }

        // 判断之前是否存在对应的 ClusterName,如果没有则需要创建新的 Cluster 对象
        if (!clusterMap.containsKey(instance.getClusterName())) {
            Loggers.SRV_LOG
                .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                    instance.getClusterName(), instance.toJson());
            // 创建新的集群对象
            Cluster cluster = new Cluster(instance.getClusterName(), this);
            cluster.init();
            // 放入到集群 clusterMap 当中
            getClusterMap().put(instance.getClusterName(), cluster);
        }

        // 通过集群名字,从 ipMap 里面取
        List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
        // 只有是新创建集群名字,这里才会为空,之前老的集群名字,在方法一开始里面都 value 赋值了 new ArrayList对象

        if (clusterIPs == null) {
            clusterIPs = new LinkedList<>();
            ipMap.put(instance.getClusterName(), clusterIPs);
        }

        // 把对应集群下的instance,添加进去
        clusterIPs.add(instance);
    } catch (Exception e) {
        Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
    }
}

// 分好类之后,针对每一个 ClusterName ,写入到注册表中
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
    // entryIPs 已经是根据ClusterName分好组的实例列表
    List<Instance> entryIPs = entry.getValue();
    
    // 根据写时复制,对每一个 Cluster 对象修改注册表 *** 重点
    // updateIps 则是 写时复制 的体现
    clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}

setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();

for (Instance instance : allIPs()) {
    stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}

Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
    stringBuilder.toString());
}

上面这一部分代码,主要就是对传入进来的 Instance 进行归类,最后把分好类的 Instance 对象,根据 Cluster 分类,对每一个 Cluster 中的实例列表进行修改。在这句代码中clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);就会体现写时复制,我们一起来看一下。

/**
 * Update instance list.
 *
 * @param ips       instance list
 * @param ephemeral whether these instances are ephemeral
 */
public void updateIps(List<Instance> ips, boolean ephemeral) {

    // 先判断是否是临时实例
    // ephemeralInstances 临时实例
    // persistentInstances 持久化实例
    // 把对应数据先拿出来,放入到 新创建的 toUpdateInstances 集合中
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

    // 先把老的实例列表复制一份 , 先复制一份新的
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }


    // 中间不重要的代码略,主要是对 oldIpMap 一些操作,其中也包括了集群节点同步,这里小册第二部分会详细讲解


    // 最后把传入进来的实例列表,重新初始化一个 HaseSet,赋值给toUpdateInstances
    toUpdateInstances = new HashSet<>(ips);
    
    // 判断是否是临时实例
    if (ephemeral) {
        // 直接把之前的实例列表替换成新的
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

  从这一部分源码中就能看出,全程没有对之前注册表的中的数据进行操作,而是先拿出来,备份一份,进行操作,最后进行替换。这样就完成了注册表的修改

总结

1、本章主要讲了 Nacos注册表的结构,可以看出Nacos注册表的设计方式还是很灵活的,可以通过命名空间、分组、集群来进行实例的区分,具体可以根据公司的业务场景来定。

2、我们还讲了在高并发下读写冲突的问题,同时也讲了 解决方案“读写复制” 的理论。最后分析Nacos 实例异步注册任务中,是如何利用 “写时复制”来完成注册表修改的。

那么到本章为止,我们 Nacos 源码分析完了一条路线:

从客户端发起服务注册,到服务端响应实例注册请求。异步任务 + 内存队列怎么来处理的整个流程,就讲完了。

最后,别忘了把源码分析图补充完整:

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-05-13 08:02:07       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-05-13 08:02:07       100 阅读
  3. 在Django里面运行非项目文件

    2024-05-13 08:02:07       82 阅读
  4. Python语言-面向对象

    2024-05-13 08:02:07       91 阅读

热门阅读

  1. PHP笔记

    PHP笔记

    2024-05-13 08:02:07      33 阅读
  2. 什么事防抖和节流,有什么区别,如何实现

    2024-05-13 08:02:07       25 阅读
  3. 02.01移除重复点

    2024-05-13 08:02:07       31 阅读
  4. 富格林:正规经验加持交易安全

    2024-05-13 08:02:07       29 阅读
  5. Lua 基础 01 入门

    2024-05-13 08:02:07       35 阅读