百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

Dubbo3.0之注册中心多层类体系源码分析

yuyutoo 2024-10-11 21:39 1 浏览 0 评论

上一篇:dubbo 3.0服务调用者调用第三方接口源码主流程分析

一、Zookeeper简介

Zookeeper 是 Apache Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用。

流程说明:

  • 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
  • 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
  • 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

二、注册中心体系流程图


三、ZK注册中心ZookeeperRegistry初始化跟ZK集群建立连接源码分析

在ZookeeperRegistry类初始化的时候最重要的就是与zk集群建立连接,加一个一个状态的监听器,我们先不管其余的流程,避免被其他不必要的细节给影响;

/**
 * ZookeeperRegistry
 * 基于zookeeper的注册中心组件
 * ZookeeperRegistry继承至CacheableFailbackRegistry继承至FailbackRegistry继承至AbstractRegistry
 * 这个就是dubbo注册中心里的三级类体系结构设计(zk/nacos具体的注册中心技术->Cacheable缓存层->)
 */
public class ZookeeperRegistry extends CacheableFailbackRegistry {

    private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

    private final static String DEFAULT_ROOT = "dubbo";

    /** zk里面的跟目录,/打头的第一级目录,默认:/dubbo */
    private final String root;

    /** 保存对应的服务的String字符串的信息 */
    private final Set<String> anyServices = new ConcurrentHashSet<>();

    /**
     * 用来保存订阅时候施加的一个监听器,这个服务可能会有很多人施加监听
     * map里会有2个Listener:notify(dubbo里自己实现的监听器)、child(针对zk子节点施加的监听器)
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();

    /** 跟zk建立网络连接的客户端 */
    private ZookeeperClient zkClient;

    /**
     * ZookeeperRegistry构建的时候比较核心的就是去跟zk建立连接
     * @param url zk的连接地址,如:zookeeper://localhost:2181
     * @param zookeeperTransporter
     */
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        /** 父类构造函数 */
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getGroup(DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        /** 基于zk的api去构建跟zk之间的连接 */
        zkClient = zookeeperTransporter.connect(url);
        /** 加一个状态的监听器 */
        zkClient.addStateListener((state) -> {
            /** 连接突然断开后很快进行了重新连接操作,此时状态就为RECONNECTED */
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                    " Since ephemeral ZNode will not get deleted for a connection lose, " +
                    "there's no need to re-register url of this instance.");
                /**
                 * 重新连接之后会尝试重新拉取之前订阅过的provider服务实例最新的集群地址
                 * 所以此时客服端如果有短暂连接再重连的状态,必须要去重新拉取一下最新的地址
                 * 因为只是暂时断开连接,所以之前发起的subsrbie订阅请求创建的临时节点
                 * 在zk端如果只是临时的网络断开,所以zk也是不会删除的你的临时节点的
                 */
                ZookeeperRegistry.this.fetchLatestAddresses();
            }
            else if (state == StateListener.NEW_SESSION_CREATED) {
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                    /**
                     * 如果经历了session断开以及会话重新连接的过程,新建立了一个会话的话
                     * 需要重新进行注册和订阅的监听
                     */
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                /**
                 * 如果连接断开时间超过一定阈值,session expire过期时间
                 * 会话一旦过期了,此时zk服务端就会把我们客户端之前注册创建的临时节点删除,同时施加的订阅监听也会删除
                 * 客户端收到的状态变更就是SESSION_LOST
                 */
                logger.warn("Url of this instance will be deleted from registry soon. " +
                    "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }
}

AbstractZookeeperTransporter的connect方法,这个方法主要是判断一下缓存中是否有对应数据,如果有则返回,如果没有则调用createZookeeperClient方法进行构建;

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    /** 先从缓存中获取有效的zk客户端,如果存在则直接返回 */
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        /** 再次从缓存中获取有效的zk客户端,如果存在则直接返回 */
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        /** 构建ZookeeperClient */
        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}

在调用Curator5ZookeeperTransporter的createZookeeperClient方法,构造Curator5ZookeeperClient;

public ZookeeperClient createZookeeperClient(URL url) {
    return new Curator5ZookeeperClient(url);
}

Curator5ZookeeperClient的构造方法会基于Curator框架去构建ZookeeperClient与ZK集群进行连接,阻塞等待,如果成功则返回,如果没有成功则抛异常;

public Curator5ZookeeperClient(URL url) {
    super(url);
    try {
        /** 超时时间,默认:30秒 */
        int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
        /** session过期时间,默认:60秒,如果zk客户端和zk服务端断开连接超过60秒则会话过期 */
        int sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
        /**
         * 基于Curator框架去构建ZookeeperClient
         * Curator目前是zk在使用中最为常用的框架,对原生zk的client做了一层包装
         */
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                /** 设置zk的url地址 */
                .connectString(url.getBackupAddress())
                /** 如果失败了间隔一秒钟重试一次 */
                .retryPolicy(new RetryNTimes(1, 1000))
                /** 设置连接超时时间,如果尝试连接超过30秒都没有连接上则会进行重试 */
                .connectionTimeoutMs(timeout)
                /** 设置session会话过期时间,断开连接超过一分钟则会话过期 */
                .sessionTimeoutMs(sessionExpireMs);
        String userInformation = url.getUserInformation();
        /** 权限控制 */
        if (userInformation != null && userInformation.length() > 0) {
            builder = builder.authorization("digest", userInformation.getBytes());
        }
        /** 构建client */
        client = builder.build();
        /**
         * 添加监听器
         * 跟zk的连接建立了之后,一般来说得关注一下跟这个zk之间的连接
         * 如果跟zk的连接有断开此时是会通知的
         */
        client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
        /** 开始建立连接 */
        client.start();
        /** 阻塞住,置到跟zk成功建立连接,超时时间:30秒 */
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            /** 如果没建立成功则抛异常 */
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

如果zk集群异常断开了怎么办呢?

此时在ZookeeperRegistry建立连接时施加的状态监听器感知到了会调用FailbackRegistry的recover方法重新进行注册和订阅发现;


protected void recover() throws Exception {
    // register
    /**
     * 因为会话是重新建立起来的,会话一旦断了,我们在zk端存储的provider/consumer的服务实例url节点就没有了
     * 我们之前施加的监听器也没有了,此时就必须重新去进行注册和订阅发现
     */
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        for (URL url : recoverRegistered) {
            // remove fail registry or unRegistry task first.
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            addFailedRegistered(url);
        }
    }
    // subscribe
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                // First remove other tasks to ensure that addFailedSubscribed can succeed.
                removeFailedSubscribed(url, listener);
                addFailedSubscribed(url, listener);
            }
        }
    }
}
		/** 将重试任务放入时间轮进行定时重试 */
		private void addFailedRegistered(URL url) {
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) {
            return;
        }
        /** 创建定时重试任务 */
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // never has a retry task. then start a new task for retry.
            /** 将定时重试任务放入时间轮定时5秒后进行重试 */
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

		/** 将重新订阅任务放入时间轮进行定时重试 */
		protected void addFailedSubscribed(URL url, NotifyListener listener) {
        Holder h = new Holder(url, listener);
        FailedSubscribedTask oldOne = failedSubscribed.get(h);
        if (oldOne != null) {
            return;
        }
        FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
        oldOne = failedSubscribed.putIfAbsent(h, newTask);
        if (oldOne == null) {
            // never has a retry task. then start a new task for retry.
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }


四、ZK注册中心注册服务实例流程

FailbackRegistry的register方法为服务实例注册的入口方法,我们一起探寻一下其中的奥秘;

我将FailbackRegistry的初始化方法和register一起贴出来了,主要是想让大家明白FailbackRegistry内部的一些数据结构,以及初始化的时候做了什么;

里面核心方法就是调用doRegister进行ZK临时节点的创建,如果异常了会调用addFailedRegistered方法将异常信息封装成FailedRegisteredTask重试任务,在将定时重试任务放入时间轮定时5秒后进行重试;

public abstract class FailbackRegistry extends AbstractRegistry {

    /*  retry task map */
    /**
     * url代表一个provider或者是一个consumer
     * 针对某个provider或者consumer在执行register的时候失败了此时就会用url->task的方式放到map里来缓存
     * 针对unregistered、subscribed、unsubscribed,同样会有一些url失败时候的任务缓存
     */
    private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

    private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

    private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

    private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

    /**
     * The time in milliseconds the retryExecutor will wait
     * 多长时间重试一次
     */
    private final int retryPeriod;

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    /**
     * 时间轮的算法和机制,可以支持大量的定时任务放到时间轮里去,时间轮就一直跟着时间在转
     * 转到指定的时间的时候此时就会执行你的定时任务
     */
    private final HashedWheelTimer retryTimer;

    public FailbackRegistry(URL url) {
        /** 父类AbstractRegistry的构造函数,从磁盘文件里加载缓存数据 */
        super(url);
        /** 获取retryPeriod重试时间周期,默认:5秒 */
        this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

        // since the retry task will not be very much. 128 ticks is enough.
        /** 根据重试时间周期构建hash时间轮 */
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }

		/** 注册入口方法 */
    public void register(URL url) {
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
            return;
        }
        /** 调用父类AbstractRegistry的register方法执行注册操作 */
        super.register(url);
        /** 将这个url已经失败的注册和取消注册的任务全部进行清理 */
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            /** 这里会向zk服务端发送请求进行注册 */
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            /** 获取一个check参数 */
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !(url.getPort() == 0);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            /** 执行failback机制,先保存故障信息,在后续定时重试 */
            addFailedRegistered(url);
        }
    }

		private void addFailedRegistered(URL url) {
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) {
            return;
        }
        /** 创建定时重试任务 */
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // never has a retry task. then start a new task for retry.
            /** 将定时重试任务放入时间轮定时5秒后进行重试 */
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

}

重试任务,里面其实就是再次调用ZookeeperRegistry的doRegister方法进行注册;

public final class FailedRegisteredTask extends AbstractRetryTask {

    private static final String NAME = "retry register";

    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
        super(url, registry, NAME);
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        /** 定时重试:会委托ZookeeperRegistry去完成注册任务 */
        registry.doRegister(url);
        /** 重试成功之后删除对应的重试任务 */
        registry.removeFailedRegisteredTask(url);
    }
}

其中会调用ZookeeperRegistry的doRegister方法进行实际的ZK节点创建,创建一个临时节点;

public void doRegister(URL url) {
    try {
        checkDestroyed();
        /** 创建zk节点 */
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

调用AbstractZookeeperClient的create方法根据是否为临时节点走不同的逻辑;

public void create(String path, boolean ephemeral) {
    /** 是否是临时节点 */
    if (!ephemeral) {
        /** 不是则判断之前是否有创建,有则返回 */
        if (persistentExistNodePath.contains(path)) {
            return;
        }
        /** 判断路径是否存在,如果存在则放入persistentExistNodePath返回 */
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        /** 创建 */
        create(path.substring(0, i), false);
    }
    /** 判断是否临时节点 */
    if (ephemeral) {
        createEphemeral(path);
    } else {
        /** 不是临时节点创建成功之后放入persistentExistNodePath */
        createPersistent(path);
        persistentExistNodePath.add(path);
    }
}

调用Curator5ZookeeperClient的createEphemeral方法通过Curator框架去实际去ZK集群交互创建临时节点;

public void createEphemeral(String path) {
    try {
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
    }
    /** node已经存在,你注册的服务实例之前已经注册过了,不能重复注册 */
    catch (NodeExistsException e) {
        logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                "we can just try to delete and create again.", e);
        /** 如果重复注册了则先删除znode,再重新创建一个临时节点 */
        deletePath(path);
      	/** 重新调用本方法,再次尝试创建节点 */
        createEphemeral(path);
    } catch (Exception e) {
        /** 创建过程中,如果和zk链接有问题或者说zk自己有问题,则抛出异常 */
        throw new IllegalStateException(e.getMessage(), e);
    }
}

五、ZK注册中心订阅发现服务实例流程

调用FailbackRegistry的subscribe方法进行服务实例订阅发现

1、先调用父类AbstractRegistry的subscribe方法在监听器映射集合根据url加一个监听器进去;

2、将订阅重试任务删除,因为调用其方法的可能是重试任务来调用的,防止重复订阅,先将重试任务删除;

3、调用ZookeeperRegistry的doSubscribe方法进行实际的注册逻辑,这里会通过CacheableFailbackRegistry的toUrlsWithEmpty方法将订阅时获取的url字符串都转换为一个URL对象;我们上面的架构图中说这个类会缓存一些信息,就包括URL对象数据;

4、如果异常了,调用addFailedSubscribed方法将失败信息封装成FailedSubscribedTask订阅失败重试任务,隔5秒后进行重试;

public void subscribe(URL url, NotifyListener listener) {
    /** 先调用抽象父类AbstractRegistry的subscribe方法进行订阅处理 */
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (CollectionUtils.isNotEmpty(urls)) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // Record a failed registration request to a failed list, retry regularly
      	/** 封装成重试任务,隔5秒进行重试 */
        addFailedSubscribed(url, listener);
    }
}

AbstractRegistry的subscribe方法,主要实现了在监听器映射集合根据url加一个监听器进去;

public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    /** 根据我们要订阅的url服务接口地址,获取subscribed集合的监听器集合 */
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    /**
     * 订阅和监听的时候是需要加一个监听器的,此时将监听器加入这个set集合中
     * NotifyListener不是zk里面的监听器,zk的监听器一般是child/watcher listener
     */
    listeners.add(listener);
}

我们先来看一下FailbackRegistry对订阅失败是如何处理的,如果捕捉到异常,则会调用addFailedSubscribed方法将url包装成一个重试任务,5秒之后进行重新订阅;

protected void addFailedSubscribed(URL url, NotifyListener listener) {
    Holder h = new Holder(url, listener);
    FailedSubscribedTask oldOne = failedSubscribed.get(h);
    if (oldOne != null) {
        return;
    }
    FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
    oldOne = failedSubscribed.putIfAbsent(h, newTask);
    if (oldOne == null) {
        // never has a retry task. then start a new task for retry.
      	/** 放入时间轮,5秒之后进行重试 */
        retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
    }
}

订阅失败重试任务,时间轮到时间会进行执行

public final class FailedSubscribedTask extends AbstractRetryTask {

    private static final String NAME = "retry subscribe";

    private final NotifyListener listener;

    public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
        super(url, registry, NAME);
        if (listener == null) {
            throw new IllegalArgumentException();
        }
        this.listener = listener;
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        /** 调用ZookeeperRegistry的doSubscribe进行订阅重试 */
        registry.doSubscribe(url, listener);
        /** 如果成功了,将订阅失败重试任务从数据结构中删除 */
        registry.removeFailedSubscribedTask(url, listener);
    }
}

刚刚先讲解了如果订阅失败会怎么处理,因为订阅逻辑还是比较复杂的,所以放在了最后,调用ZookeeperRegistry的doSubscribe方法来做具体的订阅逻辑

public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        checkDestroyed();
        /** 判断当前这个url的服务接口是否为*,一般不可能 */
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            boolean check = url.getParameter(CHECK_KEY, false);
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                            Constants.CHECK_KEY, String.valueOf(check)), k);
                    }
                }
            });
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                        Constants.CHECK_KEY, String.valueOf(check)), listener);
                }
            }
        } else {
            /** 正常情况来说都是针对指定的接口去进行监听 */
            CountDownLatch latch = new CountDownLatch(1);
            try {
                List<URL> urls = new ArrayList<>();
                /**
                 * 默认情况下就一个path,/dubbo/服务接口(如:org.apache.dubbo.demo.DemoService)/providers,
                 * 针对这个路径和地址去执行订阅和监听
                 * 先拿到这个地址下面所有的子节点列表,,这样就可以拿到这个服务当前所有的providers实例地址
                 * 再去监听你的providers下的子节点地址列表,如果有变动就可以notify通知我们
                 */
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    /**
                     * 构建一个子监听器,如果监听的节点有变化,就会回调给RegistryChildListenerImpl的notify方法进行处理
                     * 而notify方法里面其实会调用doNotify方法,这个和下面的notify方法里面调用的其实是一致的
                     * 说明施加完监听之后获取到了目标服务实例地址之后会调用doNotify方法刷新注册表缓存
                     * 如果zk节点有变动了也会回调进行处理,实际上也会通过doNotify方法刷新注册表缓存
                     */
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
                    if (zkListener instanceof RegistryChildListenerImpl) {
                        ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                    }
                    // 判断/dubbo/服务接口/providers是否存在,不存在则创建节点,存在则直接返回
                    zkClient.create(path, false);
                    // 对节点施加监听,施加监听成功之后会返回children
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        /**
                         * 每个子节点其实就是一个url字符串,每个url字符串就代表了一个provider服务实例
                         * 此时就需要将url字符串都转换为一个URL对象
                         */
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                /**
                 * url:zk地址
                 * listener:订阅时传入进来的监听器
                 * urls:订阅时发现的服务接口的providers集群地址
                 */
                notify(url, listener, urls);
            } finally {
                // tells the listener to run only after the sync notification of main thread finishes.
                latch.countDown();
            }
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

调用AbstractZookeeperClient的addChildListener方法给指定路径施加监听器,添加成功之后会返回所有的子节点列表,这样就可以拿到这个服务当前所有的providers实例地址

public List<String> addChildListener(String path, final ChildListener listener) {
    ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
    /** 创建CuratorWatcherImpl */
    TargetChildListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetChildListener(path, k));
    return addTargetChildListener(path, targetListener);
}

调用Curator5ZookeeperClient的addTargetChildListener方法通过zk客户端去和zk集群通信去施加监听,并返回其子节点

public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
    try {
        /** 针对path施加监听 */
        return client.getChildren().usingWatcher(listener).forPath(path);
    } catch (NoNodeException e) {
        return null;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

我们获取的子节点只是一个url字符串,这个还是需要在进行一层转换的,转换成URL对象,这个是通过CacheableFailbackRegistry的toUrlsWithEmpty来完成的;

protected List<URL> toUrlsWithEmpty(URL consumer, String path, Collection<String> providers) {
    List<URL> urls = new ArrayList<>(1);
    boolean isProviderPath = path.endsWith(PROVIDERS_CATEGORY);
    if (isProviderPath) {
        if (CollectionUtils.isNotEmpty(providers)) {
          	/** 我们一般是对providers进行订阅请求,所以我们看这个逻辑就好了 */
            urls = toUrlsWithoutEmpty(consumer, providers);
        } else {
            // clear cache on empty notification: unsubscribe or provider offline
            evictURLCache(consumer);
        }
    } else {
        if (CollectionUtils.isNotEmpty(providers)) {
            urls = toConfiguratorsWithoutEmpty(consumer, providers);
        }
    }

    if (urls.isEmpty()) {
        int i = path.lastIndexOf(PATH_SEPARATOR);
        String category = i < 0 ? path : path.substring(i + 1);
        if (!PROVIDERS_CATEGORY.equals(category) || !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, true)) {
            if (PROVIDERS_CATEGORY.equals(category)) {
                logger.warn("Service " + consumer.getServiceKey() + " received empty address list and empty protection is disabled, will clear current available addresses");
            }
            URL empty = URLBuilder.from(consumer)
                .setProtocol(EMPTY_PROTOCOL)
                .addParameter(CATEGORY_KEY, category)
                .build();
            urls.add(empty);
        }
    }

    return urls;
}

toUrlsWithoutEmpty方法会去判断一下consumer是否之前缓存过URL对象集合,如果没有则直接创建,如果有的话则会去走另外的一个逻辑,将之前存在的provider对应的URL对象获取出来放入URL对象集合;

如果没有则进行构建,因为有可能后面又新增了机器,之前的URL集合中没有这个;

再将之前的URL集合从缓存中删除,将新构建的URL集合放入缓存,下次使用;

最后返回URL对象集合;

protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) {
    // keep old urls
    Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer);
    // create new urls
    Map<String, ServiceAddressURL> newURLs;
    /** 将一些参数移除 */
    URL copyOfConsumer = removeParamsFromConsumer(consumer);
  	/** 判断之前是否有对应的URL对象集合 */
    if (oldURLs == null) {
        newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
        for (String rawProvider : providers) {
            rawProvider = stripOffVariableKeys(rawProvider);
            ServiceAddressURL cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
            if (cachedURL == null) {
                logger.warn("Invalid address, failed to parse into URL " + rawProvider);
                continue;
            }
            newURLs.put(rawProvider, cachedURL);
        }
    } else {
        newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
        // maybe only default , or "env" + default
        for (String rawProvider : providers) {
            rawProvider = stripOffVariableKeys(rawProvider);
            /** providers地址有变化,重新对raw url -> URL映射做一个处理,如果不存在则创建ServiceAddressURL */
            ServiceAddressURL cachedURL = oldURLs.remove(rawProvider);
            if (cachedURL == null) {
                cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
                if (cachedURL == null) {
                    logger.warn("Invalid address, failed to parse into URL " + rawProvider);
                    continue;
                }
            }
            /**
             * 如果ServiceAddressURL不为空则直接复用
             * 就是将你的raw url -> URL之间做一个缓存,下次有变动的要重新计算直接使用缓存数据就好
             * 但是如果有新的provider raw url,此时就不能使用缓存了,就需要重新创建一个ServiceAddressURL
             */
            newURLs.put(rawProvider, cachedURL);
        }
    }

    /**
     * 当你的consumer对同样服务的provider重新进行订阅和发现的时候
     * 如果有的raw url->URL有缓存则直接使用缓存
     * 如果没有才会根据raw url重新构建一个ServiceAddressURL对象
     * 在保存之前需要先删除老缓存的数据
     */
    evictURLCache(consumer);
    /** 清理完老缓存之后,再去添加新的缓存 */
    stringUrls.put(consumer, newURLs);

    return new ArrayList<>(newURLs.values());
}
		
		/** 创建ServiceAddressURL对象 */
		protected ServiceAddressURL createURL(String rawProvider, URL consumerURL, Map<String, String> extraParameters) {
        boolean encoded = true;
        // use encoded value directly to avoid URLDecoder.decode allocation.
        int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK);
        if (paramStartIdx == -1) {// if ENCODED_QUESTION_MARK does not show, mark as not encoded.
            encoded = false;
        }
        String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx);
        if (parts.length <= 1) {
            logger.warn("Received url without any parameters " + rawProvider);
            return DubboServiceAddressURL.valueOf(rawProvider, consumerURL);
        }

        String rawAddress = parts[0];
        String rawParams = parts[1];
        boolean isEncoded = encoded;
        /** 每次在创建URL的时候,URLAddress和URLParam如果能复用就进行复用,如果没有则进行创建 */
        URLAddress address = stringAddress.computeIfAbsent(rawAddress, k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded));
        address.setTimestamp(System.currentTimeMillis());

        URLParam param = stringParam.computeIfAbsent(rawParams, k -> URLParam.parse(k, isEncoded, extraParameters));
        param.setTimestamp(System.currentTimeMillis());

        ServiceAddressURL cachedURL = createServiceURL(address, param, consumerURL);
        if (isMatch(consumerURL, cachedURL)) {
            return cachedURL;
        }
        return null;
    }

		/**
     * 处理url时,老的url如果存在相应的数据的话,需要先将老缓存清理
     * @param url
     */
    protected void evictURLCache(URL url) {
        /** 删除老缓存的数据 */
        Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
        try {
            /** 如果老缓存的数据不为空 */
            if (oldURLs != null && oldURLs.size() > 0) {
                logger.info("Evicting urls for service " + url.getServiceKey() + ", size " + oldURLs.size());
                Long currentTimestamp = System.currentTimeMillis();
                /** 将待清理的URL对象放入等待清理的数据结构里面去 */
                for (Map.Entry<String, ServiceAddressURL> entry : oldURLs.entrySet()) {
                    waitForRemove.put(entry.getValue(), currentTimestamp);
                }
                if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
                    if (semaphore.tryAcquire()) {
                        /** 提交一个删除任务,间隔2分钟 */
                        cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
                    }
                }
            }
        } catch (Exception e) {
            logger.warn("Failed to evict url for " + url.getServiceKey(), e);
        }
    }

		private class RemovalTask implements Runnable {
        @Override
        public void run() {
            logger.info("Clearing cached URLs, waiting to clear size " + waitForRemove.size());
            int clearCount = 0;
            try {
                Iterator<Map.Entry<ServiceAddressURL, Long>> it = waitForRemove.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<ServiceAddressURL, Long> entry = it.next();
                    ServiceAddressURL removeURL = entry.getKey();
                    /** 之前放进去的时间戳 */
                    long removeTime = entry.getValue();
                    /** 当前时间戳 */
                    long current = System.currentTimeMillis();
                    /** 判断缓存是否超过5分钟 */
                    if (current - removeTime >= cacheClearWaitingThresholdInMillis) {
                        /** 从各个数据结构中进行清理 */
                        URLAddress urlAddress = removeURL.getUrlAddress();
                        URLParam urlParam = removeURL.getUrlParam();
                        if (current - urlAddress.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
                            stringAddress.remove(urlAddress.getRawAddress());
                        }
                        if (current - urlParam.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
                            stringParam.remove(urlParam.getRawParam());
                        }
                        it.remove();
                        clearCount++;
                    }
                }
            } catch (Throwable t) {
                logger.error("Error occurred when clearing cached URLs", t);
            } finally {
                semaphore.release();
            }
            logger.info("Clear cached URLs, size " + clearCount);

            /**
             * 判断等待删除的这个map里面是否还有数据,因为有可能有部分数据缓存时间还没有超过5分钟
             * 此时就需要再次提交一个缓存清理任务
             */
            if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
                // move to next schedule
                if (semaphore.tryAcquire()) {
                    cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

我们获取到URL对象集合后再调用FailbackRegistry的notify方法,这里其实只是做了一些空值判断就通过doNotify方法调用其父类AbstractRegistry的notify方法

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    try {
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // Record a failed registration request to a failed list
        logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
    }
}

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }

AbstractRegistry的notify方法里面比较重要的有2点

1、调用RegistryDirectory的notify方法进行Netty网络连接;

2、调用saveProperties方法进行本地磁盘文件缓存更新,就如之前的图解,AbstractRegistry这个类核心职责就是持久化了注册中心的数据,后续异常重启时不至于数据全部丢失;

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
        && !ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
    }
    // keep every provider's category.
    /** key是分类 value是分类对应的provider url列表 */
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            /** 获取url的分类,放入不同的分类list */
            String category = u.getCategory(DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    /** 迭代处理 */
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        /** 当entry为providers时会调用RegistryDirectory的notify方法进行Netty网络连接 */
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        /** 我们将在每次通知后更新缓存文件。 */
        // When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.
        /** 当我们的注册表由于网络抖动导致订阅失败时,我们至少可以返回现有的缓存URL。 */
        if (localCacheEnabled) {
            /** 更新本地缓存文件 */
            saveProperties(url);
        }
    }
}

第一点:我们先来看看怎么进行Netty网络连接的,先看一下RegistryDirectory的notify方法,其余的前置处理细节暂时不要管,先跑主流程,在看refreshOverrideAndInvoker方法

public synchronized void notify(List<URL> urls) {
    if (isDestroyed()) {
        return;
    }

    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));

    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

    // 3.x added for extend URL address
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}

private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
  // mock zookeeper://xxx?mock=return null
  refreshInvoker(urls);
}

比较关键的是这一行代码,会根据URL构建Invoker,在构建Invoker的时候就会进行Netty网络连接,会比较深,我就直接截图了

toInvokers方法会在调用ProtocolListenerWrapper的refer方法

refer方法里面又会去调用DubboProtocol的refer方法

DubboProtocol的refer方法最终会调用protocolBindingRefer方法里去,而这个里面最关键的就是getClients,这个就是进行网络连接的代码

其实getSharedClient方法最终都会调用到initClient方法里面去,我这边就不绕圈子了,如果你们想看可以自己点进去看看

我们经过很多层的跳转,最后会在AbstractClient进行实际的发起网络连接请求

初始化参数

建立连接


第二点:调用AbstractRegistry的saveProperties方法持久化数据

private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        properties.setProperty(url.getServiceKey(), buf.toString());
        long version = lastCacheChanged.incrementAndGet();
        /**
         * 判断是否异步化刷盘,默认一般都是异步刷盘
         * 其实异步刷盘也很简单,就是开一个线程,通过线程池去执行去调用doSaveProperties方法
         */
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

SaveProperties线程类,其实他的run方法就直接调用了doSaveProperties方法

private class SaveProperties implements Runnable {
    private long version;

    private SaveProperties(long version) {
        this.version = version;
    }

    @Override
    public void run() {
        doSaveProperties(version);
    }
}

通过磁盘文件锁进行加锁,防止并发问题,在进行文件写入

public void doSaveProperties(long version) {
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    File lockfile = null;
    try {
        /** 在发起对本地磁盘io操作之前先建立一个磁盘文件锁 */
        lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        /** 针对锁文件构建一个RandomAccessFile */
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
            FileChannel channel = raf.getChannel()) {
            /** 针对锁文件进行Lock加锁,如果其他线程也执行到了这里会被block住,因为锁只能被一个线程去加 */
            FileLock lock = channel.tryLock();
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            // Save
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    /** 基于jdk提供的api进行文件io操作 */
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                /** 锁释放 */
                lock.release();
            }
        }
    } catch (Throwable e) {
        savePropertiesRetryTimes.incrementAndGet();
        /** 重试次数是否小于3 */
        if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
            logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
            savePropertiesRetryTimes.set(0);
            return;
        }
        if (version < lastCacheChanged.get()) {
            savePropertiesRetryTimes.set(0);
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
    } finally {
        if (lockfile != null) {
            lockfile.delete();
        }
    }
}

我们订阅成功了之后,如果zk节点发生变化了我们又怎么感知和刷新我们的数据呢?

如果zk数据发生变化了会回调我们施加的监听器,调用监听器的childChanged方法,这个方法会在调用监听器的内部RegistryNotifier的notify方法,这个方法最终会调用到ZookeeperRegistry的notify方法里去,这个方法就是我们上面分析的,一模一样;

private class RegistryChildListenerImpl implements ChildListener {
    private RegistryNotifier notifier;
    private long lastExecuteTime;
    private volatile CountDownLatch latch;

    public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
        this.latch = latch;
        notifier = new RegistryNotifier(getUrl(), ZookeeperRegistry.this.getDelay()) {
            @Override
            public void notify(Object rawAddresses) {
                long delayTime = getDelayTime();
                if (delayTime <= 0) {
                    this.doNotify(rawAddresses);
                } else {
                    long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
                    if (interval > 0) {
                        try {
                            Thread.sleep(interval);
                        } catch (InterruptedException e) {
                            // ignore
                        }
                    }
                    lastExecuteTime = System.currentTimeMillis();
                    this.doNotify(rawAddresses);
                }
            }

            @Override
            protected void doNotify(Object rawAddresses) {
                ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
            }
        };
    }

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void childChanged(String path, List<String> children) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
        }
        notifier.notify(children);
    }
}

六、ZK注册中心销毁

ZookeeperRegistry的destroy方法

public void destroy() {
    super.destroy();
    // Just release zkClient reference, but can not close zk client here for zk client is shared somewhere else.
    // See org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
    zkClient = null;
}

FailbackRegistry的destroy方法

public void destroy() {
    super.destroy();
    retryTimer.stop();
}

AbstractRegistry的destroy方法

public void destroy() {
    if (logger.isInfoEnabled()) {
        logger.info("Destroy registry:" + getUrl());
    }
    /** 获取所有已注册的URL集合 */
    Set<URL> destroyRegistered = new HashSet<>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
        for (URL url : new HashSet<>(destroyRegistered)) {
            /** 循环已注册的URL集合,获取dynamic参数 */
            if (url.getParameter(DYNAMIC_KEY, true)) {
                try {
                    /** 取消注册 */
                    unregister(url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unregister url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
    /** 获取所有已订阅的URL 及 监听器映射集合 */
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                try {
                    /** 取消订阅 */
                    unsubscribe(url, listener);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unsubscribe url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
    /** 移除当前已销毁的注册中心 */
    registryManager.removeDestroyedRegistry(this);
}

		public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        /** 从subscribed集合中获取指定url对应的监听集合 */
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            /** 移除指定监听listener */
            listeners.remove(listener);
        }

        // do not forget remove notified
        /** 从通知集合中移除指定url */
        notified.remove(url);
    }

七、总结

Dubbo的注册中心分层类体系,每一层所关注的功能点和职责都不一样,是类结构划分更清晰,复用性更好;

相关推荐

jQuery VS AngularJS 你更钟爱哪个?

在这一次的Web开发教程中,我会尽力解答有关于jQuery和AngularJS的两个非常常见的问题,即jQuery和AngularJS之间的区别是什么?也就是说jQueryVSAngularJS?...

Jquery实时校验,指定长度的「负小数」,小数位未满末尾补0

在可以输入【负小数】的输入框获取到焦点时,移除千位分隔符,在输入数据时,实时校验输入内容是否正确,失去焦点后,添加千位分隔符格式化数字。同时小数位未满时末尾补0。HTML代码...

如何在pbootCMS前台调用自定义表单?pbootCMS自定义调用代码示例

要在pbootCMS前台调用自定义表单,您需要在后台创建表单并为其添加字段,然后在前台模板文件中添加相关代码,如提交按钮和表单验证代码。您还可以自定义表单数据的存储位置、添加文件上传字段、日期选择器、...

编程技巧:Jquery实时验证,指定长度的「负小数」

为了保障【负小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【负小数】的方法。HTML代码<inputtype="text"class="forc...

一篇文章带你用jquery mobile设计颜色拾取器

【一、项目背景】现实生活中,我们经常会遇到配色的问题,这个时候去百度一下RGB表。而RGB表只提供相对于的颜色的RGB值而没有可以验证的模块。我们可以通过jquerymobile去设计颜色的拾取器...

编程技巧:Jquery实时验证,指定长度的「正小数」

为了保障【正小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【正小数】的方法。HTML做成方法<inputtype="text"class="fo...

jquery.validate检查数组全部验证

问题:html中有多个name[],每个参数都要进行验证是否为空,这个时候直接用required:true话,不能全部验证,只要这个数组中有一个有值就可以通过的。解决方法使用addmethod...

Vue进阶(幺叁肆):npm查看包版本信息

第一种方式npmviewjqueryversions这种方式可以查看npm服务器上所有的...

layui中使用lay-verify进行条件校验

一、layui的校验很简单,主要有以下步骤:1.在form表单内加上class="layui-form"2.在提交按钮上加上lay-submit3.在想要校验的标签,加上lay-...

jQuery是什么?如何使用? jquery是什么功能组件

jQuery于2006年1月由JohnResig在BarCampNYC首次发布。它目前由TimmyWilson领导,并由一组开发人员维护。jQuery是一个JavaScript库,它简化了客户...

django框架的表单form的理解和用法-9

表单呈现...

jquery对上传文件的检测判断 jquery实现文件上传

总体思路:在前端使用jquery对上传文件做部分初步的判断,验证通过的文件利用ajaxFileUpload上传到服务器端,并将文件的存储路径保存到数据库。<asp:FileUploadI...

Nodejs之MEAN栈开发(四)-- form验证及图片上传

这一节增加推荐图书的提交和删除功能,来学习node的form提交以及node的图片上传功能。开始之前需要源码同学可以先在git上fork:https://github.com/stoneniqiu/R...

大数据开发基础之JAVA jquery 大数据java实战

上一篇我们讲解了JAVAscript的基础知识、特点及基本语法以及组成及基本用途,本期就给大家带来了JAVAweb的第二个知识点jquery,大数据开发基础之JAVAjquery,这是本篇文章的主要...

推荐四个开源的jQuery可视化表单设计器

jquery开源在线表单拖拉设计器formBuilder(推荐)jQueryformBuilder是一个开源的WEB在线html表单设计器,开发人员可以通过拖拉实现一个可视化的表单。支持表单常用控件...

取消回复欢迎 发表评论: