ubbo/org.apache.dubbo.demo.DemoService/configurators
/dubbo/org.apache.dubbo.demo.DemoService/routers
表示当前消费者(url参数代表)需要订阅这三个节点,当任意一个节点的数据发生变化时,ZooKeeper服务端都会通知到当前注册中心客户端,更新本地的服务数据。依次遍历这三个path路径,分别在这三个path上注册监听器。
ZooKeeperRegistry通过zkListeners这个Map维护了所有消费者(URL)的所有监听器数据,首先根据url参数获取当前消费者对应的监听器listeners,这是一个Map,key是NotifyListener,value是对应的ChildListener,如果不存在则需要创建ChildListener的实现RegistryChildListenerImpl。当创建完成后,会在ZooKeeper服务端创建持久化的path路径,并且在该path路径上注册监听器。首次订阅注册监听器时,会获取到该路径下的所有服务数据的字符串形式,并调用toUrlsWithEmpty转成URL形式,接着调用notify方法刷新消费者本地的服务相关数据。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
// 获取三个路径, 分别是/providers、/routers、/configurators
for (String path : toCategoriesPath(url)) {
// 获取该消费者URL对应的监听器Map
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 查找NotifyListener对应的ChildListener是否存在, 不存在则创建
ChildListener zkListener = listeners.computeIfAbsent(listener,
k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 在ZooKeeper服务端创建持久节点
zkClient.create(path, false);
// 添加ChildListener监听器, 并全量拉取下相关服务数据
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 将字符串形式的服务数据转换成URL列表数据
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次订阅后触发下notify逻辑, 刷新消费者本地的服务提供者列表等数据
notify(url, listener, urls);
} finally {
latch.countDown();
}
}
接下来看下AbstractZooKeeperClient的addChildListener方法,逻辑也比较简单,在指定path路径上添加一次性的Watcher。取消订阅的逻辑则是会将传入的UR和NotifyListener对应的ChildListener移除,不再监听。
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners =
childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
TargetChildListener targetListener = listeners.computeIfAbsent(listener,
k -> createTargetChildListener(path, k));
return addTargetChildListener(path, targetListener);
}
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
三、总结
本文通过分析Dubbo3.0中AbstractRegistry、FailbackRegistry、ZooKeeperRegistry,详细介绍了Dubbo中ZooKeeper注册中心的实现原理,包括服务数据本地缓存、服务注册订阅异常重试等。另外,网上大部分文章是基于2.x版本的,本文基于Dubbo3.0,重点分析了Dubbo3.0中新引入的CacheableFailbackRegistry,详细介绍了Dubbo在URL推送模型上所做的优化。
参考资料:
-
Dubbo3.0 阿里大规模实践解析-URL重构
-
深入浅出ZooKeeper