dubbo系列之zookeeper订阅节点

dubbo系列之zookeeper订阅节点

源码构成

RegistryProtocol类

1
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//订阅节点

FailbackRegistry类

subscribe()——向服务端发送订阅请求,及异常情况进行订阅重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void subscribe(URL url, NotifyListener listener) {
if (destroyed.get()){
return;
}
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;

List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}

ZookeeperRegistry类

doSubscribe()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {//创建一个监听集合
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);//创建一个持久化节点
List<String> children = zkClient.addChildListener(path, zkListener);//为该节点设置监听
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
List children = zkClient.addChildListener(path, zkListener);//为该节点设置监听

–>AbstractZookeeperClient.addChildListener

   -->createTargetChildListener(path, listener)//收到订阅后的处理,交给FailbackRegistry.notify处理

​ –>ZkclientZookeeperClient.createTargetChildListener
​ –>new IZkChildListener()
​ –>实现了 handleChildChange //收到订阅后的处理
​ –>listener.childChanged(parentPath, currentChilds)

1
2
3
4
5
6
7
8
9
public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
return new IZkChildListener() {
//收到订阅后的处理
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
listener.childChanged(parentPath, currentChilds);
}
};
}

–>AbstractZookeeperClient.addChildListener

-->addTargetChildListener(path, targetListener);

——启动加入订阅/dubbo/com.alibaba.dubbo.demo.DemoService/configurators

1
2
3
public List<String> addTargetChildListener(String path, final IZkChildListener listener) {
return client.subscribeChildChanges(path, listener);
}
FailbackRegistry.notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
doNotify(url, listener, urls);
   -->AbstractRegistry.notify

​ –>saveProperties(url);——将服务端的注册信息更新到本地缓存文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void saveProperties(URL url) {
if (file == null) {
return;
}

try {
StringBuilder buf = new StringBuilder();
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
properties.setProperty(url.getServiceKey(), buf.toString());
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
doSaveProperties(version);
} else {
//采用线程池来处理
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}

进入线程池看一看

AbstractRegistry类

doSaveProperties()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
try {
newProperties.putAll(properties);
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
try {
FileChannel channel = raf.getChannel();
try {
// 加锁
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// 保存
try {
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream outputFile = new FileOutputStream(file);
try {
newProperties.store(outputFile, "Dubbo Registry Cache");
} finally {
outputFile.close();
}
} finally {
lock.release();
}
} finally {
channel.close();
}
} finally {
raf.close();
}
}

配置文件更新完之后

RegistryProtocol类

—->OverrideListener

---->notify()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void notify(List<URL> urls) {
List<URL> result = null;
for (URL url : urls) {
URL overrideUrl = url;
if (url.getParameter(Constants.CATEGORY_KEY) == null
&& Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
// 兼容旧版本
overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
}
if (!UrlUtils.isMatch(subscribeUrl, overrideUrl)) {
if (result == null) {
result = new ArrayList<URL>(urls);
}
result.remove(url);
logger.warn("Subsribe category=configurator, but notifed non-configurator urls. may be registry bug. unexcepted url: " + url);
}
}
if (result != null) {
urls = result;
}
this.configurators = RegistryDirectory.toConfigurators(urls);
List<ExporterChangeableWrapper<?>> exporters = new ArrayList<ExporterChangeableWrapper<?>>(bounds.values());
for (ExporterChangeableWrapper<?> exporter : exporters) {
Invoker<?> invoker = exporter.getOriginInvoker();
final Invoker<?> originInvoker;
if (invoker instanceof InvokerDelegete) {
originInvoker = ((InvokerDelegete<?>) invoker).getInvoker();
} else {
originInvoker = invoker;
}

URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker);
//增加判断:只有 当前服务与override指定服务 匹配时,override才生效
if (urls != null && urls.size() > 0 && originUrl.getServiceKey().equals(urls.get(0).getServiceKey())) {
URL newUrl = getNewInvokerUrl(originUrl, urls);

if (!originUrl.equals(newUrl) || (this.configurators == null || this.configurators.size() == 0)) {
RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
}
}
}
}

对比新旧的信息是否有变化,有则重新暴露服务.

最后来一张zookeeper订阅节点流程图,看完就更清晰了

zookeeper订阅节点流程图