dubbo系列之集群容错-directory目录

dubbo系列之集群容错-directory目录

简介

Directory接口

目录服务
StaticDirectory:静态目录服务,他的Invoker是固定的。
RegistryDirectory:注册目录服务,他的Invoker集合数据来源于zk注册中心的。

StaticDirectory用的比较少,主要用在服务对多注册中心的引用,这篇博客我们重点就是来说一下RegistryDirectory。

源码构成

哒哒哒,老样子,先启动zk,再启动DemoProvider类服务端,最后debug模式启动DemoCousumer类。

demoService.sayHello(“world” + i)
–>InvokerInvocationHandler.invoke

1
2
3
4
5
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//....
//将所有请求参数都转换成RpcInvocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

MockClusterInvoker.invoke //1.进入集群

–>invoker.invoke(invocation)

-->AbstractClusterInvoker.invoke
    -->list(invocation)

​ –>directory.list//2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker
​ –>AbstractDirectory.list
​ –>doList(invocation)
​ –>RegistryDirectory.doList// 从this.methodInvokerMap里面查找一个Invoker

这样就进入了主角RegistryDirectory类。

RegistryDirectory类

先来看一张这个类的继承体系图

RegistryDirectory继承体系图

可以看到RegistryDirectory类实现了NotifyListener接口,并且实现回调notify(List urls), 整个过程有一个重要的map变量,methodInvokerMap(它是数据的来源;同时也是notify的重要操作对象,重点是写操作。)

dolist()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. 没有服务提供者 2. 服务提供者被禁用
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

dolist()这个方法就是从methodInvokerMap这个map里面获取数据

那methodInvokerMap这个map读是在dolist()里面做的,那又是什么时候将数据写进去的呢?是不是觉得这个话题很熟悉?在dubbo系列之服务引用-原理这篇博客里面我们有提到,在服务引用的订阅节点时,我们有进入到过RegistryDirectory.notify中的refreshInvoker这个方法,也就是在注册中心有变更的情况下,就会调用RegistryDirectory.notify 的refreshInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 根据invokerURL列表转换为invoker列表。转换规则如下:
* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
* 2.如果传入的invoker列表不为空,则表示最新的invoker列表
* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
*
* @param invokerUrls 传入的参数不能为null
*/
// TODO: 2017/8/31 FIXME 使用线程池去刷新地址,否则可能会导致任务堆积
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() == 0) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 将值写入methodInvokerMap
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

也就是在refreshInvoker方法中,刷新methodInvokerMap对象。