Dubbo3.0之注册中心多层类体系源码分析
yuyutoo 2024-10-11 21:39 1 浏览 0 评论
上一篇:dubbo 3.0服务调用者调用第三方接口源码主流程分析
一、Zookeeper简介
Zookeeper 是 Apache Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用。
流程说明:
- 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
- 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
- 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
二、注册中心体系流程图
三、ZK注册中心ZookeeperRegistry初始化跟ZK集群建立连接源码分析
在ZookeeperRegistry类初始化的时候最重要的就是与zk集群建立连接,加一个一个状态的监听器,我们先不管其余的流程,避免被其他不必要的细节给影响;
/**
* ZookeeperRegistry
* 基于zookeeper的注册中心组件
* ZookeeperRegistry继承至CacheableFailbackRegistry继承至FailbackRegistry继承至AbstractRegistry
* 这个就是dubbo注册中心里的三级类体系结构设计(zk/nacos具体的注册中心技术->Cacheable缓存层->)
*/
public class ZookeeperRegistry extends CacheableFailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final static String DEFAULT_ROOT = "dubbo";
/** zk里面的跟目录,/打头的第一级目录,默认:/dubbo */
private final String root;
/** 保存对应的服务的String字符串的信息 */
private final Set<String> anyServices = new ConcurrentHashSet<>();
/**
* 用来保存订阅时候施加的一个监听器,这个服务可能会有很多人施加监听
* map里会有2个Listener:notify(dubbo里自己实现的监听器)、child(针对zk子节点施加的监听器)
*/
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
/** 跟zk建立网络连接的客户端 */
private ZookeeperClient zkClient;
/**
* ZookeeperRegistry构建的时候比较核心的就是去跟zk建立连接
* @param url zk的连接地址,如:zookeeper://localhost:2181
* @param zookeeperTransporter
*/
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
/** 父类构造函数 */
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getGroup(DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
/** 基于zk的api去构建跟zk之间的连接 */
zkClient = zookeeperTransporter.connect(url);
/** 加一个状态的监听器 */
zkClient.addStateListener((state) -> {
/** 连接突然断开后很快进行了重新连接操作,此时状态就为RECONNECTED */
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
/**
* 重新连接之后会尝试重新拉取之前订阅过的provider服务实例最新的集群地址
* 所以此时客服端如果有短暂连接再重连的状态,必须要去重新拉取一下最新的地址
* 因为只是暂时断开连接,所以之前发起的subsrbie订阅请求创建的临时节点
* 在zk端如果只是临时的网络断开,所以zk也是不会删除的你的临时节点的
*/
ZookeeperRegistry.this.fetchLatestAddresses();
}
else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
/**
* 如果经历了session断开以及会话重新连接的过程,新建立了一个会话的话
* 需要重新进行注册和订阅的监听
*/
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
/**
* 如果连接断开时间超过一定阈值,session expire过期时间
* 会话一旦过期了,此时zk服务端就会把我们客户端之前注册创建的临时节点删除,同时施加的订阅监听也会删除
* 客户端收到的状态变更就是SESSION_LOST
*/
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
}
AbstractZookeeperTransporter的connect方法,这个方法主要是判断一下缓存中是否有对应数据,如果有则返回,如果没有则调用createZookeeperClient方法进行构建;
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
// address format: {[username:password@]address}
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
/** 先从缓存中获取有效的zk客户端,如果存在则直接返回 */
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
/** 再次从缓存中获取有效的zk客户端,如果存在则直接返回 */
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
/** 构建ZookeeperClient */
zookeeperClient = createZookeeperClient(url);
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
在调用Curator5ZookeeperTransporter的createZookeeperClient方法,构造Curator5ZookeeperClient;
public ZookeeperClient createZookeeperClient(URL url) {
return new Curator5ZookeeperClient(url);
}
Curator5ZookeeperClient的构造方法会基于Curator框架去构建ZookeeperClient与ZK集群进行连接,阻塞等待,如果成功则返回,如果没有成功则抛异常;
public Curator5ZookeeperClient(URL url) {
super(url);
try {
/** 超时时间,默认:30秒 */
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
/** session过期时间,默认:60秒,如果zk客户端和zk服务端断开连接超过60秒则会话过期 */
int sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
/**
* 基于Curator框架去构建ZookeeperClient
* Curator目前是zk在使用中最为常用的框架,对原生zk的client做了一层包装
*/
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
/** 设置zk的url地址 */
.connectString(url.getBackupAddress())
/** 如果失败了间隔一秒钟重试一次 */
.retryPolicy(new RetryNTimes(1, 1000))
/** 设置连接超时时间,如果尝试连接超过30秒都没有连接上则会进行重试 */
.connectionTimeoutMs(timeout)
/** 设置session会话过期时间,断开连接超过一分钟则会话过期 */
.sessionTimeoutMs(sessionExpireMs);
String userInformation = url.getUserInformation();
/** 权限控制 */
if (userInformation != null && userInformation.length() > 0) {
builder = builder.authorization("digest", userInformation.getBytes());
}
/** 构建client */
client = builder.build();
/**
* 添加监听器
* 跟zk的连接建立了之后,一般来说得关注一下跟这个zk之间的连接
* 如果跟zk的连接有断开此时是会通知的
*/
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
/** 开始建立连接 */
client.start();
/** 阻塞住,置到跟zk成功建立连接,超时时间:30秒 */
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
/** 如果没建立成功则抛异常 */
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
如果zk集群异常断开了怎么办呢?
此时在ZookeeperRegistry建立连接时施加的状态监听器感知到了会调用FailbackRegistry的recover方法重新进行注册和订阅发现;
protected void recover() throws Exception {
// register
/**
* 因为会话是重新建立起来的,会话一旦断了,我们在zk端存储的provider/consumer的服务实例url节点就没有了
* 我们之前施加的监听器也没有了,此时就必须重新去进行注册和订阅发现
*/
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// remove fail registry or unRegistry task first.
removeFailedRegistered(url);
removeFailedUnregistered(url);
addFailedRegistered(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// First remove other tasks to ensure that addFailedSubscribed can succeed.
removeFailedSubscribed(url, listener);
addFailedSubscribed(url, listener);
}
}
}
}
/** 将重试任务放入时间轮进行定时重试 */
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
/** 创建定时重试任务 */
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
/** 将定时重试任务放入时间轮定时5秒后进行重试 */
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
/** 将重新订阅任务放入时间轮进行定时重试 */
protected void addFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedSubscribedTask oldOne = failedSubscribed.get(h);
if (oldOne != null) {
return;
}
FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
oldOne = failedSubscribed.putIfAbsent(h, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
四、ZK注册中心注册服务实例流程
FailbackRegistry的register方法为服务实例注册的入口方法,我们一起探寻一下其中的奥秘;
我将FailbackRegistry的初始化方法和register一起贴出来了,主要是想让大家明白FailbackRegistry内部的一些数据结构,以及初始化的时候做了什么;
里面核心方法就是调用doRegister进行ZK临时节点的创建,如果异常了会调用addFailedRegistered方法将异常信息封装成FailedRegisteredTask重试任务,在将定时重试任务放入时间轮定时5秒后进行重试;
public abstract class FailbackRegistry extends AbstractRegistry {
/* retry task map */
/**
* url代表一个provider或者是一个consumer
* 针对某个provider或者consumer在执行register的时候失败了此时就会用url->task的方式放到map里来缓存
* 针对unregistered、subscribed、unsubscribed,同样会有一些url失败时候的任务缓存
*/
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
/**
* The time in milliseconds the retryExecutor will wait
* 多长时间重试一次
*/
private final int retryPeriod;
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
/**
* 时间轮的算法和机制,可以支持大量的定时任务放到时间轮里去,时间轮就一直跟着时间在转
* 转到指定的时间的时候此时就会执行你的定时任务
*/
private final HashedWheelTimer retryTimer;
public FailbackRegistry(URL url) {
/** 父类AbstractRegistry的构造函数,从磁盘文件里加载缓存数据 */
super(url);
/** 获取retryPeriod重试时间周期,默认:5秒 */
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
// since the retry task will not be very much. 128 ticks is enough.
/** 根据重试时间周期构建hash时间轮 */
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
/** 注册入口方法 */
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
return;
}
/** 调用父类AbstractRegistry的register方法执行注册操作 */
super.register(url);
/** 将这个url已经失败的注册和取消注册的任务全部进行清理 */
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
/** 这里会向zk服务端发送请求进行注册 */
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
/** 获取一个check参数 */
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !(url.getPort() == 0);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
/** 执行failback机制,先保存故障信息,在后续定时重试 */
addFailedRegistered(url);
}
}
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
/** 创建定时重试任务 */
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
/** 将定时重试任务放入时间轮定时5秒后进行重试 */
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
}
重试任务,里面其实就是再次调用ZookeeperRegistry的doRegister方法进行注册;
public final class FailedRegisteredTask extends AbstractRetryTask {
private static final String NAME = "retry register";
public FailedRegisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}
@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
/** 定时重试:会委托ZookeeperRegistry去完成注册任务 */
registry.doRegister(url);
/** 重试成功之后删除对应的重试任务 */
registry.removeFailedRegisteredTask(url);
}
}
其中会调用ZookeeperRegistry的doRegister方法进行实际的ZK节点创建,创建一个临时节点;
public void doRegister(URL url) {
try {
checkDestroyed();
/** 创建zk节点 */
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
调用AbstractZookeeperClient的create方法根据是否为临时节点走不同的逻辑;
public void create(String path, boolean ephemeral) {
/** 是否是临时节点 */
if (!ephemeral) {
/** 不是则判断之前是否有创建,有则返回 */
if (persistentExistNodePath.contains(path)) {
return;
}
/** 判断路径是否存在,如果存在则放入persistentExistNodePath返回 */
if (checkExists(path)) {
persistentExistNodePath.add(path);
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
/** 创建 */
create(path.substring(0, i), false);
}
/** 判断是否临时节点 */
if (ephemeral) {
createEphemeral(path);
} else {
/** 不是临时节点创建成功之后放入persistentExistNodePath */
createPersistent(path);
persistentExistNodePath.add(path);
}
}
调用Curator5ZookeeperClient的createEphemeral方法通过Curator框架去实际去ZK集群交互创建临时节点;
public void createEphemeral(String path) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
}
/** node已经存在,你注册的服务实例之前已经注册过了,不能重复注册 */
catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
/** 如果重复注册了则先删除znode,再重新创建一个临时节点 */
deletePath(path);
/** 重新调用本方法,再次尝试创建节点 */
createEphemeral(path);
} catch (Exception e) {
/** 创建过程中,如果和zk链接有问题或者说zk自己有问题,则抛出异常 */
throw new IllegalStateException(e.getMessage(), e);
}
}
五、ZK注册中心订阅发现服务实例流程
调用FailbackRegistry的subscribe方法进行服务实例订阅发现
1、先调用父类AbstractRegistry的subscribe方法在监听器映射集合根据url加一个监听器进去;
2、将订阅重试任务删除,因为调用其方法的可能是重试任务来调用的,防止重复订阅,先将重试任务删除;
3、调用ZookeeperRegistry的doSubscribe方法进行实际的注册逻辑,这里会通过CacheableFailbackRegistry的toUrlsWithEmpty方法将订阅时获取的url字符串都转换为一个URL对象;我们上面的架构图中说这个类会缓存一些信息,就包括URL对象数据;
4、如果异常了,调用addFailedSubscribed方法将失败信息封装成FailedSubscribedTask订阅失败重试任务,隔5秒后进行重试;
public void subscribe(URL url, NotifyListener listener) {
/** 先调用抽象父类AbstractRegistry的subscribe方法进行订阅处理 */
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
/** 封装成重试任务,隔5秒进行重试 */
addFailedSubscribed(url, listener);
}
}
AbstractRegistry的subscribe方法,主要实现了在监听器映射集合根据url加一个监听器进去;
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
/** 根据我们要订阅的url服务接口地址,获取subscribed集合的监听器集合 */
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
/**
* 订阅和监听的时候是需要加一个监听器的,此时将监听器加入这个set集合中
* NotifyListener不是zk里面的监听器,zk的监听器一般是child/watcher listener
*/
listeners.add(listener);
}
我们先来看一下FailbackRegistry对订阅失败是如何处理的,如果捕捉到异常,则会调用addFailedSubscribed方法将url包装成一个重试任务,5秒之后进行重新订阅;
protected void addFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedSubscribedTask oldOne = failedSubscribed.get(h);
if (oldOne != null) {
return;
}
FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
oldOne = failedSubscribed.putIfAbsent(h, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
/** 放入时间轮,5秒之后进行重试 */
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
订阅失败重试任务,时间轮到时间会进行执行
public final class FailedSubscribedTask extends AbstractRetryTask {
private static final String NAME = "retry subscribe";
private final NotifyListener listener;
public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}
@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
/** 调用ZookeeperRegistry的doSubscribe进行订阅重试 */
registry.doSubscribe(url, listener);
/** 如果成功了,将订阅失败重试任务从数据结构中删除 */
registry.removeFailedSubscribedTask(url, listener);
}
}
刚刚先讲解了如果订阅失败会怎么处理,因为订阅逻辑还是比较复杂的,所以放在了最后,调用ZookeeperRegistry的doSubscribe方法来做具体的订阅逻辑
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
/** 判断当前这个url的服务接口是否为*,一般不可能 */
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
boolean check = url.getParameter(CHECK_KEY, false);
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(check)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(check)), listener);
}
}
} else {
/** 正常情况来说都是针对指定的接口去进行监听 */
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
/**
* 默认情况下就一个path,/dubbo/服务接口(如:org.apache.dubbo.demo.DemoService)/providers,
* 针对这个路径和地址去执行订阅和监听
* 先拿到这个地址下面所有的子节点列表,,这样就可以拿到这个服务当前所有的providers实例地址
* 再去监听你的providers下的子节点地址列表,如果有变动就可以notify通知我们
*/
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
/**
* 构建一个子监听器,如果监听的节点有变化,就会回调给RegistryChildListenerImpl的notify方法进行处理
* 而notify方法里面其实会调用doNotify方法,这个和下面的notify方法里面调用的其实是一致的
* 说明施加完监听之后获取到了目标服务实例地址之后会调用doNotify方法刷新注册表缓存
* 如果zk节点有变动了也会回调进行处理,实际上也会通过doNotify方法刷新注册表缓存
*/
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 判断/dubbo/服务接口/providers是否存在,不存在则创建节点,存在则直接返回
zkClient.create(path, false);
// 对节点施加监听,施加监听成功之后会返回children
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
/**
* 每个子节点其实就是一个url字符串,每个url字符串就代表了一个provider服务实例
* 此时就需要将url字符串都转换为一个URL对象
*/
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
/**
* url:zk地址
* listener:订阅时传入进来的监听器
* urls:订阅时发现的服务接口的providers集群地址
*/
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
调用AbstractZookeeperClient的addChildListener方法给指定路径施加监听器,添加成功之后会返回所有的子节点列表,这样就可以拿到这个服务当前所有的providers实例地址
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
/** 创建CuratorWatcherImpl */
TargetChildListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetChildListener(path, k));
return addTargetChildListener(path, targetListener);
}
调用Curator5ZookeeperClient的addTargetChildListener方法通过zk客户端去和zk集群通信去施加监听,并返回其子节点
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
/** 针对path施加监听 */
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
我们获取的子节点只是一个url字符串,这个还是需要在进行一层转换的,转换成URL对象,这个是通过CacheableFailbackRegistry的toUrlsWithEmpty来完成的;
protected List<URL> toUrlsWithEmpty(URL consumer, String path, Collection<String> providers) {
List<URL> urls = new ArrayList<>(1);
boolean isProviderPath = path.endsWith(PROVIDERS_CATEGORY);
if (isProviderPath) {
if (CollectionUtils.isNotEmpty(providers)) {
/** 我们一般是对providers进行订阅请求,所以我们看这个逻辑就好了 */
urls = toUrlsWithoutEmpty(consumer, providers);
} else {
// clear cache on empty notification: unsubscribe or provider offline
evictURLCache(consumer);
}
} else {
if (CollectionUtils.isNotEmpty(providers)) {
urls = toConfiguratorsWithoutEmpty(consumer, providers);
}
}
if (urls.isEmpty()) {
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
if (!PROVIDERS_CATEGORY.equals(category) || !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, true)) {
if (PROVIDERS_CATEGORY.equals(category)) {
logger.warn("Service " + consumer.getServiceKey() + " received empty address list and empty protection is disabled, will clear current available addresses");
}
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
}
return urls;
}
toUrlsWithoutEmpty方法会去判断一下consumer是否之前缓存过URL对象集合,如果没有则直接创建,如果有的话则会去走另外的一个逻辑,将之前存在的provider对应的URL对象获取出来放入URL对象集合;
如果没有则进行构建,因为有可能后面又新增了机器,之前的URL集合中没有这个;
再将之前的URL集合从缓存中删除,将新构建的URL集合放入缓存,下次使用;
最后返回URL对象集合;
protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) {
// keep old urls
Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer);
// create new urls
Map<String, ServiceAddressURL> newURLs;
/** 将一些参数移除 */
URL copyOfConsumer = removeParamsFromConsumer(consumer);
/** 判断之前是否有对应的URL对象集合 */
if (oldURLs == null) {
newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
for (String rawProvider : providers) {
rawProvider = stripOffVariableKeys(rawProvider);
ServiceAddressURL cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
if (cachedURL == null) {
logger.warn("Invalid address, failed to parse into URL " + rawProvider);
continue;
}
newURLs.put(rawProvider, cachedURL);
}
} else {
newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
// maybe only default , or "env" + default
for (String rawProvider : providers) {
rawProvider = stripOffVariableKeys(rawProvider);
/** providers地址有变化,重新对raw url -> URL映射做一个处理,如果不存在则创建ServiceAddressURL */
ServiceAddressURL cachedURL = oldURLs.remove(rawProvider);
if (cachedURL == null) {
cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
if (cachedURL == null) {
logger.warn("Invalid address, failed to parse into URL " + rawProvider);
continue;
}
}
/**
* 如果ServiceAddressURL不为空则直接复用
* 就是将你的raw url -> URL之间做一个缓存,下次有变动的要重新计算直接使用缓存数据就好
* 但是如果有新的provider raw url,此时就不能使用缓存了,就需要重新创建一个ServiceAddressURL
*/
newURLs.put(rawProvider, cachedURL);
}
}
/**
* 当你的consumer对同样服务的provider重新进行订阅和发现的时候
* 如果有的raw url->URL有缓存则直接使用缓存
* 如果没有才会根据raw url重新构建一个ServiceAddressURL对象
* 在保存之前需要先删除老缓存的数据
*/
evictURLCache(consumer);
/** 清理完老缓存之后,再去添加新的缓存 */
stringUrls.put(consumer, newURLs);
return new ArrayList<>(newURLs.values());
}
/** 创建ServiceAddressURL对象 */
protected ServiceAddressURL createURL(String rawProvider, URL consumerURL, Map<String, String> extraParameters) {
boolean encoded = true;
// use encoded value directly to avoid URLDecoder.decode allocation.
int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK);
if (paramStartIdx == -1) {// if ENCODED_QUESTION_MARK does not show, mark as not encoded.
encoded = false;
}
String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx);
if (parts.length <= 1) {
logger.warn("Received url without any parameters " + rawProvider);
return DubboServiceAddressURL.valueOf(rawProvider, consumerURL);
}
String rawAddress = parts[0];
String rawParams = parts[1];
boolean isEncoded = encoded;
/** 每次在创建URL的时候,URLAddress和URLParam如果能复用就进行复用,如果没有则进行创建 */
URLAddress address = stringAddress.computeIfAbsent(rawAddress, k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded));
address.setTimestamp(System.currentTimeMillis());
URLParam param = stringParam.computeIfAbsent(rawParams, k -> URLParam.parse(k, isEncoded, extraParameters));
param.setTimestamp(System.currentTimeMillis());
ServiceAddressURL cachedURL = createServiceURL(address, param, consumerURL);
if (isMatch(consumerURL, cachedURL)) {
return cachedURL;
}
return null;
}
/**
* 处理url时,老的url如果存在相应的数据的话,需要先将老缓存清理
* @param url
*/
protected void evictURLCache(URL url) {
/** 删除老缓存的数据 */
Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
try {
/** 如果老缓存的数据不为空 */
if (oldURLs != null && oldURLs.size() > 0) {
logger.info("Evicting urls for service " + url.getServiceKey() + ", size " + oldURLs.size());
Long currentTimestamp = System.currentTimeMillis();
/** 将待清理的URL对象放入等待清理的数据结构里面去 */
for (Map.Entry<String, ServiceAddressURL> entry : oldURLs.entrySet()) {
waitForRemove.put(entry.getValue(), currentTimestamp);
}
if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
if (semaphore.tryAcquire()) {
/** 提交一个删除任务,间隔2分钟 */
cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
}
}
}
} catch (Exception e) {
logger.warn("Failed to evict url for " + url.getServiceKey(), e);
}
}
private class RemovalTask implements Runnable {
@Override
public void run() {
logger.info("Clearing cached URLs, waiting to clear size " + waitForRemove.size());
int clearCount = 0;
try {
Iterator<Map.Entry<ServiceAddressURL, Long>> it = waitForRemove.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ServiceAddressURL, Long> entry = it.next();
ServiceAddressURL removeURL = entry.getKey();
/** 之前放进去的时间戳 */
long removeTime = entry.getValue();
/** 当前时间戳 */
long current = System.currentTimeMillis();
/** 判断缓存是否超过5分钟 */
if (current - removeTime >= cacheClearWaitingThresholdInMillis) {
/** 从各个数据结构中进行清理 */
URLAddress urlAddress = removeURL.getUrlAddress();
URLParam urlParam = removeURL.getUrlParam();
if (current - urlAddress.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
stringAddress.remove(urlAddress.getRawAddress());
}
if (current - urlParam.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
stringParam.remove(urlParam.getRawParam());
}
it.remove();
clearCount++;
}
}
} catch (Throwable t) {
logger.error("Error occurred when clearing cached URLs", t);
} finally {
semaphore.release();
}
logger.info("Clear cached URLs, size " + clearCount);
/**
* 判断等待删除的这个map里面是否还有数据,因为有可能有部分数据缓存时间还没有超过5分钟
* 此时就需要再次提交一个缓存清理任务
*/
if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
// move to next schedule
if (semaphore.tryAcquire()) {
cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
}
}
}
}
我们获取到URL对象集合后再调用FailbackRegistry的notify方法,这里其实只是做了一些空值判断就通过doNotify方法调用其父类AbstractRegistry的notify方法
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
AbstractRegistry的notify方法里面比较重要的有2点
1、调用RegistryDirectory的notify方法进行Netty网络连接;
2、调用saveProperties方法进行本地磁盘文件缓存更新,就如之前的图解,AbstractRegistry这个类核心职责就是持久化了注册中心的数据,后续异常重启时不至于数据全部丢失;
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
}
// keep every provider's category.
/** key是分类 value是分类对应的provider url列表 */
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
/** 获取url的分类,放入不同的分类list */
String category = u.getCategory(DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
/** 迭代处理 */
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
/** 当entry为providers时会调用RegistryDirectory的notify方法进行Netty网络连接 */
listener.notify(categoryList);
// We will update our cache file after each notification.
/** 我们将在每次通知后更新缓存文件。 */
// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.
/** 当我们的注册表由于网络抖动导致订阅失败时,我们至少可以返回现有的缓存URL。 */
if (localCacheEnabled) {
/** 更新本地缓存文件 */
saveProperties(url);
}
}
}
第一点:我们先来看看怎么进行Netty网络连接的,先看一下RegistryDirectory的notify方法,其余的前置处理细节暂时不要管,先跑主流程,在看refreshOverrideAndInvoker方法
public synchronized void notify(List<URL> urls) {
if (isDestroyed()) {
return;
}
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
// 3.x added for extend URL address
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
refreshInvoker(urls);
}
比较关键的是这一行代码,会根据URL构建Invoker,在构建Invoker的时候就会进行Netty网络连接,会比较深,我就直接截图了
toInvokers方法会在调用ProtocolListenerWrapper的refer方法
refer方法里面又会去调用DubboProtocol的refer方法
DubboProtocol的refer方法最终会调用protocolBindingRefer方法里去,而这个里面最关键的就是getClients,这个就是进行网络连接的代码
其实getSharedClient方法最终都会调用到initClient方法里面去,我这边就不绕圈子了,如果你们想看可以自己点进去看看
我们经过很多层的跳转,最后会在AbstractClient进行实际的发起网络连接请求
初始化参数
建立连接
第二点:调用AbstractRegistry的saveProperties方法持久化数据
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
properties.setProperty(url.getServiceKey(), buf.toString());
long version = lastCacheChanged.incrementAndGet();
/**
* 判断是否异步化刷盘,默认一般都是异步刷盘
* 其实异步刷盘也很简单,就是开一个线程,通过线程池去执行去调用doSaveProperties方法
*/
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
SaveProperties线程类,其实他的run方法就直接调用了doSaveProperties方法
private class SaveProperties implements Runnable {
private long version;
private SaveProperties(long version) {
this.version = version;
}
@Override
public void run() {
doSaveProperties(version);
}
}
通过磁盘文件锁进行加锁,防止并发问题,在进行文件写入
public void doSaveProperties(long version) {
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
File lockfile = null;
try {
/** 在发起对本地磁盘io操作之前先建立一个磁盘文件锁 */
lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
/** 针对锁文件构建一个RandomAccessFile */
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
/** 针对锁文件进行Lock加锁,如果其他线程也执行到了这里会被block住,因为锁只能被一个线程去加 */
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
/** 基于jdk提供的api进行文件io操作 */
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
/** 锁释放 */
lock.release();
}
}
} catch (Throwable e) {
savePropertiesRetryTimes.incrementAndGet();
/** 重试次数是否小于3 */
if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
savePropertiesRetryTimes.set(0);
return;
}
if (version < lastCacheChanged.get()) {
savePropertiesRetryTimes.set(0);
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
} finally {
if (lockfile != null) {
lockfile.delete();
}
}
}
我们订阅成功了之后,如果zk节点发生变化了我们又怎么感知和刷新我们的数据呢?
如果zk数据发生变化了会回调我们施加的监听器,调用监听器的childChanged方法,这个方法会在调用监听器的内部RegistryNotifier的notify方法,这个方法最终会调用到ZookeeperRegistry的notify方法里去,这个方法就是我们上面分析的,一模一样;
private class RegistryChildListenerImpl implements ChildListener {
private RegistryNotifier notifier;
private long lastExecuteTime;
private volatile CountDownLatch latch;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
this.latch = latch;
notifier = new RegistryNotifier(getUrl(), ZookeeperRegistry.this.getDelay()) {
@Override
public void notify(Object rawAddresses) {
long delayTime = getDelayTime();
if (delayTime <= 0) {
this.doNotify(rawAddresses);
} else {
long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
}
}
lastExecuteTime = System.currentTimeMillis();
this.doNotify(rawAddresses);
}
}
@Override
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
}
};
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void childChanged(String path, List<String> children) {
try {
latch.await();
} catch (InterruptedException e) {
logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
}
notifier.notify(children);
}
}
六、ZK注册中心销毁
ZookeeperRegistry的destroy方法
public void destroy() {
super.destroy();
// Just release zkClient reference, but can not close zk client here for zk client is shared somewhere else.
// See org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
zkClient = null;
}
FailbackRegistry的destroy方法
public void destroy() {
super.destroy();
retryTimer.stop();
}
AbstractRegistry的destroy方法
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
/** 获取所有已注册的URL集合 */
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(destroyRegistered)) {
/** 循环已注册的URL集合,获取dynamic参数 */
if (url.getParameter(DYNAMIC_KEY, true)) {
try {
/** 取消注册 */
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
/** 获取所有已订阅的URL 及 监听器映射集合 */
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
/** 取消订阅 */
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
/** 移除当前已销毁的注册中心 */
registryManager.removeDestroyedRegistry(this);
}
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
/** 从subscribed集合中获取指定url对应的监听集合 */
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
/** 移除指定监听listener */
listeners.remove(listener);
}
// do not forget remove notified
/** 从通知集合中移除指定url */
notified.remove(url);
}
七、总结
Dubbo的注册中心分层类体系,每一层所关注的功能点和职责都不一样,是类结构划分更清晰,复用性更好;
相关推荐
- jQuery VS AngularJS 你更钟爱哪个?
-
在这一次的Web开发教程中,我会尽力解答有关于jQuery和AngularJS的两个非常常见的问题,即jQuery和AngularJS之间的区别是什么?也就是说jQueryVSAngularJS?...
- Jquery实时校验,指定长度的「负小数」,小数位未满末尾补0
-
在可以输入【负小数】的输入框获取到焦点时,移除千位分隔符,在输入数据时,实时校验输入内容是否正确,失去焦点后,添加千位分隔符格式化数字。同时小数位未满时末尾补0。HTML代码...
- 如何在pbootCMS前台调用自定义表单?pbootCMS自定义调用代码示例
-
要在pbootCMS前台调用自定义表单,您需要在后台创建表单并为其添加字段,然后在前台模板文件中添加相关代码,如提交按钮和表单验证代码。您还可以自定义表单数据的存储位置、添加文件上传字段、日期选择器、...
- 编程技巧:Jquery实时验证,指定长度的「负小数」
-
为了保障【负小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【负小数】的方法。HTML代码<inputtype="text"class="forc...
- 一篇文章带你用jquery mobile设计颜色拾取器
-
【一、项目背景】现实生活中,我们经常会遇到配色的问题,这个时候去百度一下RGB表。而RGB表只提供相对于的颜色的RGB值而没有可以验证的模块。我们可以通过jquerymobile去设计颜色的拾取器...
- 编程技巧:Jquery实时验证,指定长度的「正小数」
-
为了保障【正小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【正小数】的方法。HTML做成方法<inputtype="text"class="fo...
- jquery.validate检查数组全部验证
-
问题:html中有多个name[],每个参数都要进行验证是否为空,这个时候直接用required:true话,不能全部验证,只要这个数组中有一个有值就可以通过的。解决方法使用addmethod...
- Vue进阶(幺叁肆):npm查看包版本信息
-
第一种方式npmviewjqueryversions这种方式可以查看npm服务器上所有的...
- layui中使用lay-verify进行条件校验
-
一、layui的校验很简单,主要有以下步骤:1.在form表单内加上class="layui-form"2.在提交按钮上加上lay-submit3.在想要校验的标签,加上lay-...
- jQuery是什么?如何使用? jquery是什么功能组件
-
jQuery于2006年1月由JohnResig在BarCampNYC首次发布。它目前由TimmyWilson领导,并由一组开发人员维护。jQuery是一个JavaScript库,它简化了客户...
- django框架的表单form的理解和用法-9
-
表单呈现...
- jquery对上传文件的检测判断 jquery实现文件上传
-
总体思路:在前端使用jquery对上传文件做部分初步的判断,验证通过的文件利用ajaxFileUpload上传到服务器端,并将文件的存储路径保存到数据库。<asp:FileUploadI...
- Nodejs之MEAN栈开发(四)-- form验证及图片上传
-
这一节增加推荐图书的提交和删除功能,来学习node的form提交以及node的图片上传功能。开始之前需要源码同学可以先在git上fork:https://github.com/stoneniqiu/R...
- 大数据开发基础之JAVA jquery 大数据java实战
-
上一篇我们讲解了JAVAscript的基础知识、特点及基本语法以及组成及基本用途,本期就给大家带来了JAVAweb的第二个知识点jquery,大数据开发基础之JAVAjquery,这是本篇文章的主要...
- 推荐四个开源的jQuery可视化表单设计器
-
jquery开源在线表单拖拉设计器formBuilder(推荐)jQueryformBuilder是一个开源的WEB在线html表单设计器,开发人员可以通过拖拉实现一个可视化的表单。支持表单常用控件...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)