Dubbo源码解读-Consumer消费端服务列表刷新

上篇我们介绍了Consumer消费端@Reference服务端引用流程,地址如下

Dubbo源码解读-Consumer消费端@Reference服务端引用流程-CSDN博客

        本文主要针Dubbo消费端服务列表刷新原理,从dubbo源码角度进行解析。

        大家可以好好仔细读一下本文。有疑问欢迎留言。

        接着说明,读Dubbo源码最好是先对Spring源码有一定的了解。如果大家需要,我也可以针对Spring框架做一系列源码的解读专栏。

         不过不用担心,如果需要Spring的源码知识,文章中也会进行Spring源码铺垫介绍的。

        如果内容中有没描述清楚的,或者大家在阅读源代码有疑问的,欢迎留言,看到就会及时回复。

主要内容

  • 知识铺垫
  • 消费端服务列表刷

知识铺垫

        消费端@Reference依赖注入服务端时,会注册providers,configurators,routers三个节点的监听事件,当服务列表节点发生变化时,注册中心就会通知监听器。监听器为RegistryDirectory类,实现了NotifyListener。具体细节可跳转到

Dubbo源码解读-Consumer消费端@Reference服务端引用流程-CSDN博客

消费端服务列表刷新

        consummer端的notify订阅事件(服务列表刷新)。整个细节过程是比较复杂的。因为好多同学为了应付面试或者觉得细节流程太繁琐或者复杂,只想弄清楚相对简练的过程。所以对细节流程做了一个流程概述的部分,方便大家理解大概过程,以及应对面试。更详细的过程可关注详细流程。

流程概述

  1. RegistryDirectory.notify(List<URL> invokerUrls)监听三个节点providers,configurators,routers
  2. 根据Url中的目录(category),设置具体协议url
    1. invokerUrls:dubbo协议:providers目录
    2. routerUrls:router协议:routes目录
    3. configuratorsUrl:overide协议:configurators
  3. 处理协议:
    1. routerUrls:routes目录变动
      1. 将协议专为Router对象,ConditionRouter对象。持有whenCondition和thenCondition
      2. 集合添加MockInvokersSelector和TagRouter两个router
    2. configuratorUrls(动态修改配置信息):configurators节点变动
      1. urls为List<Configurator>对象
      2. 将override协议中的属性合并到overrideDirectoryUrl中。
    3. invokerUrls:(providers节点变动,比如服务上线下线),dubbo协议.【其实就是生成DubboInvoker,建立了两个映射关系
      1. urlInvokerMap:建立url和invoker的映射关系
        1. 合并Url属性,消费端配置优先
        2. protocol.refer(serviceType, url):protocol执行链,最终DubboProtocol.refer()
          1. 创建NettyClient和handler连
          2. 创建DubboInvoker
      2. methodInvokerMap:建立服务中方法和invoker的映射关系

详细流程

  1. RegistryDirectory.notify(List<URL> invokerUrls)
  2. 根据Url中的目录(category),设置具体协议ur
    1. invokerUrls:dubbo协议:providers目录
    2. routerUrls:router协议:routes目录
    3. configuratorsUrl:overide协议:configurators
  3. 协议处理
    1. routerUrls(动态修改路由信息):router:协
      1. URls专为Router对象
        1. List<Router> routers = toRouters(routerUrls);
        2. 遍历Urls
          1. .route://0.0.0.0/com.xiangxue.jack.service.UserService?category=routers&dynamic=false&enabled=true&force=false&name=routeTest&priority=0&router=condition&rule=method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo&runtime=false
          2. .获取router属性。值为condition
          3. 设置协议为condition
          4. routerFacotry获取Router对象
            1. 调用ConditionRouterFactory.getRouter(URL url)
            2. new ConditionRouter(url)
              1. 获取key=rule字符串:method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo
              2. 解析whencondition:=>箭头之前
              3. 解析thencondition:=>箭头之后
            3. 返回ConditionRouter对象。持有whenCondition和thenCondition
      2. 集合添加MockInvokersSelector和TagRouter两个router
    2. configuratorUrls(动态修改配置信息):之前provider端notify有详细介绍
      1. 转urls为List<Configurator>对象
      2. 将override协议中的属性合并到overrideDirectoryUrl中。
    3. invokerUrls(动态修改providers节点,比如服务上线下线):url为dubbo协议,处理Dubbo协议
      1. refreshInvoker(invokerUrls):动态修改providers节点,比如服务上线下线
      2. 如果只有一个empty协议。消费端启动的时候,先会是empty协议清空服务列表.forbid设置为true.
      3. 否则:
        1. forbiden设置为false.
        2. urlInvokerMap:建立url和invoker的映射关系
          1. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
          2. 遍历所有invokerUrls。最终每一个invokerUrl都会对应一个Invoker对象
          3. 根据协议过滤invokerUrls
          4. 合并消费端配置属性,消费端配置优先:URL url = mergeUrl(providerUrl);
          5. 生成缓存key,String key = url.toFullString()
          6. protocol.refer(serviceType, url),url为Dubbo协议:protocol.refer(serviceType, url)。调用protocol包装类QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->DubboProtocol
            1. DubboProtocol:
              1. 获取ExchangeClient对象getClients(url)。
                1. 获取connections配置参数,配置客户端跟服务端建立几个长连接
                2. 初始化长链接
                3. 绑定 requestHandler,是最后调用的handler
              2. 创建DubboInvoker对象,并返。
            2. ProtocolListenerwrapper:包装ListenerInvokerWrapper,持有Invoker和List<InvokerListener>列表
            3. ProtocolFilterWrapper:根据Filter构建invoker执行链
          7. 创建InvokerDelegate,持有ListenerInvokerWrapper,ProviderUrl和合并消费端属性的url
            1. invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
          8. 建立url和invoke对象的映射:newUrlInvokerMap.put(key, invoker)
          9. 返回映射newUrlInvokerMap
        3. methodInvokerMap:建立服务中方法和invoke的映射关系
          1. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
          2. 循环所有的注册的invoke
          3. 循环所有方法,建立方法和invoke的list的映射关系
          4. 设置*和服务列表的映射
          5. 遍历所有方法,根据路由规则获取匹配的Invokers集合,重新放入映射关系中
            1. route(methodInvokers, method)
            2. 首先经过方法匹配:whencondition匹配
            3. 然后其他条件匹配:thenCondition匹配
          6. method对应的invokers排序。
          7. 返回映射关系
        4. 删除不可用的invoker
        5. 【总结】
          1. 如果是消费端先启动,此时providers下无节点,接受empty协议。否则接受providers节点下所有注册的dubbo协议,即提供者节点
          2. empty协议,设置forbiden为true
          3. List<invokerUrls:设置forbiden为false.
          4. 遍历List<invokerUrls>,将URL专为Invoker。每个Url节点都会对应一个Invoker对象InvokerDelegate
            1. Protocol链路调用。
            2. DubboProtocol:创建长链接,生成DubboInvoker
            3. ProtocolFilterWrapper:根据Filter创建Invoker执行链
            4. ProtocolListenerwrapper:包装ListenerInvokerWrapper
          5. urlInvokerMap:建立url和invoke的映射关系,toInvokers()
          6. methodInvokerMap:建立服务中方法和invoke的映射关系,并进行router过滤,toMethodInvokers()
          7. 删除不可用的invoker。老的oldUrlInvokerMap中多余的invoker

源码解析

1.RegistryDirectory.Notify:接受注册中心通知处理三个协议

public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        //动态修改配置信息,比如在dubbo-admin中修改配置属性,会更新本地配置列表
        // configurators
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        //动态修改路由信息
        // routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        //动态修改providers节点,比如服务上线下线
        // providers
        refreshInvoker(invokerUrls);
    }

2.RegistryDirectory.refrreshInvoker。服务列表刷新缓存

private void refreshInvoker(List<URL> invokerUrls) {
        //消费端启动的时候,先会是empty协议清空服务列表
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            //这里是建立url和invoke的映射关系
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            //这里是建立服务中方法和invoke的映射关系
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                //删除不可用的invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

3.RegistryDirecotry:建立Url和Invoker映射关系

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            
            //属性合并,消费端优先
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        //这里会掉到DubboProtocol
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    
                }
                if (invoker != null) { // Put new invoker in cache
                    //建立url和invoke对象的映射
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

4.RegistryDirecotry:建立方法和Invoker集合映射关系

private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
        Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
        // According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
        List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
        if (invokersMap != null && invokersMap.size() > 0) {
            //循环所有的注册的invoke
            for (Invoker<T> invoker : invokersMap.values()) {
                //获取到url中的方法名称
                String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
                if (parameter != null && parameter.length() > 0) {
                    String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
                    if (methods != null && methods.length > 0) {
                        //循环所有方法
                        for (String method : methods) {
                            if (method != null && method.length() > 0
                                    && !Constants.ANY_VALUE.equals(method)) {
                                //建立方法和invoke的list的映射关系
                                List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
                                if (methodInvokers == null) {
                                    methodInvokers = new ArrayList<Invoker<T>>();
                                    newMethodInvokerMap.put(method, methodInvokers);
                                }
                                methodInvokers.add(invoker);
                            }
                        }
                    }
                }
                invokersList.add(invoker);
            }
        }
        //根据路由规则进行过滤
        List<Invoker<T>> newInvokersList = route(invokersList, null);
        //设置*和服务列表的映射
        newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
        if (serviceMethods != null && serviceMethods.length > 0) {
            for (String method : serviceMethods) {
                List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
                if (methodInvokers == null || methodInvokers.isEmpty()) {
                    methodInvokers = newInvokersList;
                }
                newMethodInvokerMap.put(method, route(methodInvokers, method));
            }
        }
        // sort and unmodifiable
        for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
            List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
            //url的自然排序
            Collections.sort(methodInvokers, InvokerComparator.getComparator());
            newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
        }
        return Collections.unmodifiableMap(newMethodInvokerMap);
    }

总结:上面内容中,每个从业务流程和源码角度进行了详细分析,如果大家有疑问或者对文章排版任何方面有建议都可以留言评论,看到都会及时回复大家。

知识总结,分享不易,全文手敲,欢迎大家关注点赞评论收藏。

相关推荐

  1. Dubbo解读-Consumer消费服务列表刷新

    2024-04-20 13:24:02       28 阅读
  2. Dubbo解读-Mock原理和消费服务列表刷新

    2024-04-20 13:24:02       30 阅读
  3. Dubbo解读-Consumer调用流程解析

    2024-04-20 13:24:02       31 阅读
  4. Dubbo解析-Provider服务暴露Export解析

    2024-04-20 13:24:02       37 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-20 13:24:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-20 13:24:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-20 13:24:02       82 阅读
  4. Python语言-面向对象

    2024-04-20 13:24:02       91 阅读

热门阅读

  1. 【云原生数据库:原理与实践】2 -数据库与云原生

    2024-04-20 13:24:02       170 阅读
  2. Kafka顺序消费以及消息积压问题

    2024-04-20 13:24:02       120 阅读
  3. Rx.Net 第三章 linq介绍

    2024-04-20 13:24:02       36 阅读
  4. docker分layer的好处

    2024-04-20 13:24:02       33 阅读
  5. 营销场景的自动化建模思考

    2024-04-20 13:24:02       38 阅读