上篇我们介绍了Consumer消费端@Reference服务端引用流程,地址如下
Dubbo源码解读-Consumer消费端@Reference服务端引用流程-CSDN博客
本文主要针Dubbo消费端服务列表刷新原理,从dubbo源码角度进行解析。
大家可以好好仔细读一下本文。有疑问欢迎留言。
接着说明,读Dubbo源码最好是先对Spring源码有一定的了解。如果大家需要,我也可以针对Spring框架做一系列源码的解读专栏。
不过不用担心,如果需要Spring的源码知识,文章中也会进行Spring源码铺垫介绍的。
如果内容中有没描述清楚的,或者大家在阅读源代码有疑问的,欢迎留言,看到就会及时回复。
主要内容
- 知识铺垫
- 消费端服务列表刷
知识铺垫
消费端@Reference依赖注入服务端时,会注册providers,configurators,routers三个节点的监听事件,当服务列表节点发生变化时,注册中心就会通知监听器。监听器为RegistryDirectory类,实现了NotifyListener。具体细节可跳转到
Dubbo源码解读-Consumer消费端@Reference服务端引用流程-CSDN博客
消费端服务列表刷新
consummer端的notify订阅事件(服务列表刷新)。整个细节过程是比较复杂的。因为好多同学为了应付面试或者觉得细节流程太繁琐或者复杂,只想弄清楚相对简练的过程。所以对细节流程做了一个流程概述的部分,方便大家理解大概过程,以及应对面试。更详细的过程可关注详细流程。
流程概述
- RegistryDirectory.notify(List<URL> invokerUrls)监听三个节点providers,configurators,routers
- 根据Url中的目录(category),设置具体协议url
- invokerUrls:dubbo协议:providers目录
- routerUrls:router协议:routes目录
- configuratorsUrl:overide协议:configurators
- 处理协议:
- routerUrls:routes目录变动
- 将协议专为Router对象,ConditionRouter对象。持有whenCondition和thenCondition
- 集合添加MockInvokersSelector和TagRouter两个router
- configuratorUrls(动态修改配置信息):configurators节点变动
- urls为List<Configurator>对象
- 将override协议中的属性合并到overrideDirectoryUrl中。
- invokerUrls:(providers节点变动,比如服务上线下线),dubbo协议.【其实就是生成DubboInvoker,建立了两个映射关系】
- urlInvokerMap:建立url和invoker的映射关系
- 合并Url属性,消费端配置优先
- protocol.refer(serviceType, url):protocol执行链,最终DubboProtocol.refer()
- 创建NettyClient和handler连
- 创建DubboInvoker
- methodInvokerMap:建立服务中方法和invoker的映射关系
详细流程
- RegistryDirectory.notify(List<URL> invokerUrls)
- 根据Url中的目录(category),设置具体协议ur
- invokerUrls:dubbo协议:providers目录
- routerUrls:router协议:routes目录
- configuratorsUrl:overide协议:configurators
- 协议处理
- routerUrls(动态修改路由信息):router:协
- URls专为Router对象
- List<Router> routers = toRouters(routerUrls);
- 遍历Urls
- .route://0.0.0.0/com.xiangxue.jack.service.UserService?category=routers&dynamic=false&enabled=true&force=false&name=routeTest&priority=0&router=condition&rule=method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo&runtime=false
- .获取router属性。值为condition
- 设置协议为condition
- routerFacotry获取Router对象
- 调用ConditionRouterFactory.getRouter(URL url)
- new ConditionRouter(url)
- 获取key=rule字符串:method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo
- 解析whencondition:=>箭头之前
- 解析thencondition:=>箭头之后
- 返回ConditionRouter对象。持有whenCondition和thenCondition
- 集合添加MockInvokersSelector和TagRouter两个router
- URls专为Router对象
- configuratorUrls(动态修改配置信息):之前provider端notify有详细介绍
- 转urls为List<Configurator>对象
- 将override协议中的属性合并到overrideDirectoryUrl中。
- invokerUrls(动态修改providers节点,比如服务上线下线):url为dubbo协议,处理Dubbo协议
- refreshInvoker(invokerUrls):动态修改providers节点,比如服务上线下线
- 如果只有一个empty协议。消费端启动的时候,先会是empty协议清空服务列表.forbid设置为true.
- 否则:
- forbiden设置为false.
- urlInvokerMap:建立url和invoker的映射关系
- Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
- 遍历所有invokerUrls。最终每一个invokerUrl都会对应一个Invoker对象
- 根据协议过滤invokerUrls
- 合并消费端配置属性,消费端配置优先:URL url = mergeUrl(providerUrl);
- 生成缓存key,String key = url.toFullString()
- protocol.refer(serviceType, url),url为Dubbo协议:protocol.refer(serviceType, url)。调用protocol包装类QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->DubboProtocol
- DubboProtocol:
- 获取ExchangeClient对象getClients(url)。
- 获取connections配置参数,配置客户端跟服务端建立几个长连接
- 初始化长链接
- 绑定 requestHandler,是最后调用的handler
- 创建DubboInvoker对象,并返。
- 获取ExchangeClient对象getClients(url)。
- ProtocolListenerwrapper:包装ListenerInvokerWrapper,持有Invoker和List<InvokerListener>列表
- ProtocolFilterWrapper:根据Filter构建invoker执行链
- DubboProtocol:
- 创建InvokerDelegate,持有ListenerInvokerWrapper,ProviderUrl和合并消费端属性的url
- invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
- 建立url和invoke对象的映射:newUrlInvokerMap.put(key, invoker)
- 返回映射newUrlInvokerMap
- methodInvokerMap:建立服务中方法和invoke的映射关系
- Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
- 循环所有的注册的invoke
- 循环所有方法,建立方法和invoke的list的映射关系
- 设置*和服务列表的映射
- 遍历所有方法,根据路由规则获取匹配的Invokers集合,重新放入映射关系中
- route(methodInvokers, method)
- 首先经过方法匹配:whencondition匹配
- 然后其他条件匹配:thenCondition匹配
- method对应的invokers排序。
- 返回映射关系
- 删除不可用的invoker
- 【总结】
- 如果是消费端先启动,此时providers下无节点,接受empty协议。否则接受providers节点下所有注册的dubbo协议,即提供者节点
- empty协议,设置forbiden为true
- List<invokerUrls:设置forbiden为false.
- 遍历List<invokerUrls>,将URL专为Invoker。每个Url节点都会对应一个Invoker对象InvokerDelegate
- Protocol链路调用。
- DubboProtocol:创建长链接,生成DubboInvoker
- ProtocolFilterWrapper:根据Filter创建Invoker执行链
- ProtocolListenerwrapper:包装ListenerInvokerWrapper
- urlInvokerMap:建立url和invoke的映射关系,toInvokers()
- methodInvokerMap:建立服务中方法和invoke的映射关系,并进行router过滤,toMethodInvokers()
- 删除不可用的invoker。老的oldUrlInvokerMap中多余的invoker
- routerUrls(动态修改路由信息):router:协
源码解析
1.RegistryDirectory.Notify:接受注册中心通知处理三个协议
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
//动态修改配置信息,比如在dubbo-admin中修改配置属性,会更新本地配置列表
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
//动态修改路由信息
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
//动态修改providers节点,比如服务上线下线
// providers
refreshInvoker(invokerUrls);
}
2.RegistryDirectory.refrreshInvoker。服务列表刷新缓存
private void refreshInvoker(List<URL> invokerUrls) {
//消费端启动的时候,先会是empty协议清空服务列表
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//这里是建立url和invoke的映射关系
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//这里是建立服务中方法和invoke的映射关系
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
//删除不可用的invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
3.RegistryDirecotry:建立Url和Invoker映射关系
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//属性合并,消费端优先
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//这里会掉到DubboProtocol
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
if (invoker != null) { // Put new invoker in cache
//建立url和invoke对象的映射
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
4.RegistryDirecotry:建立方法和Invoker集合映射关系
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
// According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
//循环所有的注册的invoke
for (Invoker<T> invoker : invokersMap.values()) {
//获取到url中的方法名称
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if (parameter != null && parameter.length() > 0) {
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
//循环所有方法
for (String method : methods) {
if (method != null && method.length() > 0
&& !Constants.ANY_VALUE.equals(method)) {
//建立方法和invoke的list的映射关系
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
}
}
//根据路由规则进行过滤
List<Invoker<T>> newInvokersList = route(invokersList, null);
//设置*和服务列表的映射
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
// sort and unmodifiable
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
//url的自然排序
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
总结:上面内容中,每个从业务流程和源码角度进行了详细分析,如果大家有疑问或者对文章排版任何方面有建议都可以留言评论,看到都会及时回复大家。
知识总结,分享不易,全文手敲,欢迎大家关注点赞评论收藏。