Zookeeper系列——4Zookeeper的Watcher机制原理分析
yuyutoo 2024-10-16 15:46 4 浏览 0 评论
CSDN地址:https://blog.csdn.net/Eclipse_2019/article/details/126400812
学习目标
- 理解Zookeeper的watcher机制原理
第1章 客户端注册监听
zookeeper的watcher机制注册监听流程图由于在头条上显示模糊,所以不再上传,请参考CSDN上的同名文章。
先简单看看Watcher的使用
ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理Watcher 和客户端回调 Watcher
客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理
public class Demo01 {
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
ZooKeeper zookeeper=new ZooKeeper("127.0.0.1:2181",4000,new Watcher(){
@Override
public void process(WatchedEvent event) {
System.out.println("event.type:"+event.getType());
}
});
zookeeper.create("/watch","0".getBytes(), ZooDefs.Ids. OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); //创建节点
zookeeper.exists("/watch",true); //注册监听
Thread.sleep(1000);
zookeeper.setData("/watch", "1".getBytes(),-1) ; //修改节点的值触发监听
}
}
持久化监听
从zookeeper3.6开始,提供了持久化监听以及递归监听机制,演示如下(我本地的zookeeper版本为3.4.13的和3.5.6,所以这块就不做演示了)
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency><!--分布式锁、leader选举、队列...-->
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
ZooKeeper zooKeeper=new ZooKeeper("localhost:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//表示连接成功之后,会产生的回调时间
}
});
zooKeeper.addWatch("/first1",watchedEvent -> {
System.out.println(watchedEvent.getPath());
},AddWatchMode.PERSISTENT);
其实很多流程在前面两篇文章已经讲过了,为了大家印象更深刻以及思路更清晰,有些流程在本文不做太详细的介绍。
1.1 建立连接
ZooKeeper zookeeper=new ZooKeeper("127.0.0.1:2181") , Zookeeper在初始化的时候,会构建一个Watcher,我们可以先看看Zookeeper初始化做了什么事情。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly) throws IOException{
//在这里将watcher设置到ZKWatchManager
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//初始化了ClientCnxn,在这里会创建一个sendThread和eventThread线程
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//启动一个sendThread线程进行通信还有eventThread进行事件通知
cnxn.start();
}
cnxn.start();应该比较熟悉了吧,这个方法里面会启动一个sendThread线程进行通信还有eventThread进行事件通知,然后通过sendThread线程去建立连接,这一步的逻辑在上文中已经着重讲过,这里不再赘述。
1.2 创建节点
这一步是通过create方法区完成,在create方法中我们会发现,实际上他就是调用的sumbitRequest,然后在底层也是调用sendThread发送请求到服务端。
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
...
request.setAcl(acl);
//这一步也眼熟的很,是上文中介绍过的请求处理,实际上进入底层你会发现依然是通过sendThread去发送请求了。
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
1.3 注册监听
exists是用来判断一个节点是否存在,同时,还会针对这个节点注册一个watcher事件。
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
//构建请求头
RequestHeader h = new RequestHeader();
//表示当前请求的操作类型是exists
h.setType(ZooDefs.OpCode.exists);
//构建发送请求和响应的response
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
//通过submitRequest发送请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
//返回stat元数据
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
发现了吧,其实不管是什么操作在zk里面都是这么操作的,调用submitRequest,接下来我们注重分析一下这个方法
1.3.1 submitRequest
这里面的处理逻辑比较简单
- 调用queuePacket,把请求数据添加到队列
- 通过packet.wait使得当前线程一直阻塞,直到请求完成
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
//把请求数据添加到队列
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// 等到请求执行完成
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
1.3.2 queuePacket
在这个方法里面实际上就干了两件事,1、创建一个Packet对象并加入到队列;2、唤醒一个sendThread线程去执行
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
//构建一个Packet对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//添加到outgoingQueue,这里很显然又是一个生产者消费者模式。
outgoingQueue.add(packet);
}
}
//唤醒阻塞在selector.select上的线程
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
1.3.3 SendThread.run
在Zookeeper这个对象初始化的时候,启动了一个SendThread,这个线程会从outgoingQueue中获取任务,然后发送到服务端处理。这块内容实际上上文已经讲过了。
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {//如果是存活状态
try {
if (!clientCnxnSocket.isConnected()) {//如果不是连接状态,则需要进行连接的建立
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);//开启连接
clientCnxnSocket.updateLastSendAndHeard();//更新最近一次发送和心跳的时间
}
...
//这里就是核心的处理逻辑,真正进行网络传输;
//pendingQueue表示已经发送出去的数据需要等待server返回的packet队列
//outgoingQueue是等待发送出去的packet队列
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
...
}
}
...
}
1.3.4 doTransport
调用协议层进行数据传输。
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
...
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
//建立连接事件
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//读写事件
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
...
}
1.3.5 doIO
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
...
//写请求
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
//在队列里面找到可以发送的packet
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
//如果Packet的byteBuffer没有创建,那么就创建
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);//从待发送队列中移除
if (p.requestHeader != null//判断数据包的请求,ping以及auth不加入待回复队列
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {//添加到pendingQueue待回复队列
pendingQueue.add(p);
}
}
}
}
...
}
}
}
1.3.6 总结
到目前为止,我们已经分析了客户端请求的发送流程,我们来画一个简单的流程图梳理一下
第2章 服务端接收
服务端接收这块内容在本文中只做简单介绍,详细的讲解都在上文中。
2.1 前置流程
1、通过AcceptedThread去接收请求——>执行run方法——>select——>doAccept——>将当前连接分配给selectThread线程——>selectThread.run——>select——>handleIO——>workerPool.schedule——>通过workThread去执行ScheduledWorkRequest任务——>ScheduledWorkRequest.run——>IOWorkRequest.doWork——>doIO——>readPayload——>readRequest——>processPacket——>submitRequest——>执行业务链路——>最开始调用PreRequestProcessor.run方法——>pRequest
pRequest方法比较长,主要逻辑就是根据不同的请求类型实现不同的操作。在代码中我们可以看到对于 exists 请求,没有特别的逻辑处理。
2、然后逻辑会执行到SyncRequestProcessor.processRequest
这个 processor负责把写request持久化到本地磁盘,为了提高写磁盘的效率,这里使用的是缓冲写,但是会周期性(1000个request)的调用flush操作,flush之后request已经确保写到磁盘了.同时他还要维护本机的txnlog和snapshot,这里的基本逻辑是:
每隔snapCount/2个request会重新生成一个snapshot并滚动一次txnlog,同时为了避免所有的zookeeper server在同一个时间生成snapshot和滚动日志,这里会再加上一个随机数,snapCount的默认值是100000个request
2.2 FinalRequestProcessor
上面的逻辑走完会进入到FinalRequestProcessor,这个是最终的一个处理器,主要负责把已经commit的写操作应用到本机,对于读操作则从本机中读取数据并返回给client
2.3 statNode
按照前面我们讲过的原理,statNode应该会做两个事情
- 获取指定节点的元数据
- 保存针对该节点的事件监听
注意,在这个方法中,将ServerCnxn向上转型为Watcher了。
public Stat statNode(String path, Watcher watcher)
throws KeeperException.NoNodeException {
Stat stat = new Stat();
//根据path获取节点数据
DataNode n = nodes.get(path);
//如果watcher不为空,则将当前的watcher和path进行绑定
if (watcher != null) {
//增加watch
dataWatches.addWatch(path, watcher);
}
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
//copy属性设置到stat中
n.copyStat(stat);
return stat;
}
}
2.4 addWatch
通过WatchManager来保存指定节点的事件监听,WatchManager维护了两个集合。
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
watchTable表示从节点路径到watcher集合的映射
而watch2Paths则表示从watcher到所有节点路径集合的映射。
synchronized void addWatch(String path, Watcher watcher) {
//存储指定path对应的watcher,一个path可以存在多个客户端进行watcher,所以保存了一个set集合
HashSet<Watcher> list = watchTable.get(path);
//判断watcherTable中是否存在当前路径对应的watcher
if (list == null) {//如果为空,说明针对当前节点的watcher还不存在,则进行初始化。
///如果节点上的watcher很少,就不要浪费内存,只添加4个长度,后续进行扩容
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);//把watcher(对应的是一个ServerCnxn)保存到list中。
//watcher到节点的映射关系表
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {//如果为空,则初始化并保存
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
//3.6版本之后在这里会设置watch的模式
// watch 有三种类型,一种是PERSISTENT、一种是PERSISTENT_RECURSIVE、STANDARD,
// 前者是持久化订阅,后者是持久化递归订阅,所谓递归订阅就是针对监听的节点的子节点的变化都会触发监听
//todo watcherModeManager.setWatcherMode(watcher, path, watcherMode);
//将path保存到集合
paths.add(path);
}
2.5 返回处理结果
在FinalRequestProcessor的processRequest方法中,将处理结果rsp返回给客户端。
try {
//服务端将请求返回,这时客户端会收到服务端响应
cnxn.sendResponse(hdr, rsp, "response");
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
最终结果被客户端的接收
2.6 总结
调用关系链如下
第3章 客户端收到请求
客户端接收请求的处理是在ClientCnxnSocketNIO的doIO中,之前客户端发起请求是写,现在客户端收到请求,则是一个读操作,也就是当客户端收到服务端的数据时会触发一下代码的执行。其中很关键的是 sendThread.readResponse(incomingBuffer); 来接收服务端的请求。
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
//之前建立连接的时候执行这个
readConnectResult();
enableRead();
...
} else {
//现在注册监听执行这个
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
3.1 readResponse
这个方法里面主要的流程如下
- 首先读取header,如果其xid == -2,表明是一个ping的response,return
- 如果xid是 -4 ,表明是一个AuthPacket的response return
- 如果xid是 -1,表明是一个notification,此时要继续读取并构造一个enent,通过EventThread.queueEvent发送,return
- 其它情况下:从pendingQueue拿出一个Packet,校验后更新packet信息
对于exists请求,返回的xid=1,则进入到其他情况来处理
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
...
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
...
}
if (replyHdr.getXid() == -1) {
// -1 means notification
...
}
...
Packet packet;
//pendingQueue中存储的是客户端传递过去的数据包packet
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
//表示这个请求包已经处理完成,直接移除
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
//确保是同一个id
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
//把服务端返回的头信息设置到packet中
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
//反序列化返回的消息体
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
//调用finishPacket完成消息的处理
finishPacket(packet);
}
}
3.2 finishPacket
通过前面客户端和服务端的交互,可以确定服务端已经成功保存了watcher这个事件,那么受到服务端的确认之后,客户端会把这个watcher保存到本地的事件中。
所以,finishPacket主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager中去
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
//将事件注册到zkwatchemanager中
//watchRegistration,在组装请求的时候,我们初始化了这个对象
//把watchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 existsWatches 中存储起来。
p.watchRegistration.register(p.replyHeader.getErr());
}
//cb就是AsnycCallback,如果为null,表明是同步调用的接口,不需要异步回掉,因此,直接notifyAll即可。
//这里唤醒的就是在客户端调用exists方法中,wait()的逻辑,这样表示服务处理完成。
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
3.3 register
把path对应的watcher本地回调保存到一个集合中。
public void register(int rc) {
if (shouldAddWatch(rc)) {//根据返回的code来决定是否需要添加watch
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {//初始化watches集合
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
//把watcher保存到watches集合,此时的watcher对应的就是在exists方法中传入的匿名内部类。
watchers.add(watcher);
}
}
}
3.4 ZkWatchManager
ZkWatchManager是客户端这边用来保存本地节点对应的watcher回调的管理类,提供了三种不同的事件管理机制。
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
}
总的来说,当使用ZooKeeper 构造方法或者使用 getData、exists 和 getChildren 三个接口来向ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher对应关系存储起来备用。
第4章 事件触发
前面这么长的说明,只是为了清晰的说明事件的注册流程,最终的触发,还得需要通过事务型操作来完成,在我们最开始的案例中,通过如下代码去完成了事件的触发
zookeeper.setData("/watch", "1".getBytes(),-1) ;
前面的客户端和服务端对接的流程就不再重复讲解了,交互流程是一样的,唯一的差别在于事件触发了
4.1 服务端的事件响应
服务端收到setData请求时,会进入到FinalRequestProcessor这个类中 ProcessTxnResult rc = zks.processTxn(request);
4.1.1 DataTree.setData
从该方法一直进入到DataTree.setData这个方法。
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
//得到节点数据
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
//修改节点数据
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
//触发NodeDataChanged事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
4.1.2 triggerWatch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
//根据类型、连接状态、路径,构建WatchedEvent
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
//根据path获取watcher
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//遍历watchers,循环处理事件
w.process(e);
}
return watchers;
}
4.1.3 w.process
还记得我们在服务端绑定事件的时候,watcher绑定是是什么?是ServerCnxn, 所以w.process(e),其实调用的应该是ServerCnxn的process方法。而servercnxn又是一个抽象方法,有两个实现类,分别是:NIOServerCnxn和NettyServerCnxn。那接下来我们扒开NIOServerCnxn这个类的process方法看看究竟
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
//把事件对象WatcherEvent返回给客户端
sendResponse(h, e, "notification");
}
4.2 客户端的事件处理
客户端收到请求,仍然执行SendThread.readResponse,此时的消息通知类型的xid=-1,所以需要进入到-1的分支进行判断
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
4.2.1 queueEvent
SendThread 接收到服务端的通知事件后,会通过调用 EventThread 类的 queueEvent 方法将事件传给 EventThread 线程,queueEvent 方法根据该通知事件,从 ZKWatchManager 中取出所有相关的Watcher,如果获取到相应的Watcher,就会让Watcher移除失效。
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
4.2.2 materialize
通过dataWatches或者existWatches或者childWatches的remove取出对应的watch,表明客户端watch也是注册一次就移除
同时需要根据keeperState、eventType和path返回应该被通知的Watcher集合
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
4.2.3 waitingEvents.add
waitingEvents是EventThread这个线程中的阻塞队列,很明显,又是在我们第一步操作的时候实例化的一个线程。
从名字可以指导,waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
4.2.4 processEvent
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {//判断事件类型
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;//得到watcherseteventPair
for (Watcher watcher : pair.watchers) {//拿到符合触发机制的所有watcher列表,循环进行调用
try {
watcher.process(pair.event);//调用客户端的回调process
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
}
下文预告
- Zookeeper集群中的Leader选举
- Zookeeper集群中的数据同步
相关推荐
- 史上最全的浏览器兼容性问题和解决方案
-
微信ID:WEB_wysj(点击关注)◎◎◎◎◎◎◎◎◎一┳═┻︻▄(页底留言开放,欢迎来吐槽)●●●...
-
- 平面设计基础知识_平面设计基础知识实验收获与总结
-
CSS构造颜色,背景与图像1.使用span更好的控制文本中局部区域的文本:文本;2.使用display属性提供区块转变:display:inline(是内联的...
-
2025-02-21 16:01 yuyutoo
- 写作排版简单三步就行-工具篇_作文排版模板
-
和我们工作中日常word排版内部交流不同,这篇教程介绍的写作排版主要是用于“微信公众号、头条号”网络展示。写作展现的是我的思考,排版是让写作在网格上更好地展现。在写作上花费时间是有累积复利优势的,在排...
- 写一个2048的游戏_2048小游戏功能实现
-
1.创建HTML文件1.打开一个文本编辑器,例如Notepad++、SublimeText、VisualStudioCode等。2.将以下HTML代码复制并粘贴到文本编辑器中:html...
- 今天你穿“短袖”了吗?青岛最高23℃!接下来几天气温更刺激……
-
最近的天气暖和得让很多小伙伴们喊“热”!!! 昨天的气温到底升得有多高呢?你家有没有榜上有名?...
- CSS不规则卡片,纯CSS制作优惠券样式,CSS实现锯齿样式
-
之前也有写过CSS优惠券样式《CSS3径向渐变实现优惠券波浪造型》,这次再来温习一遍,并且将更为详细的讲解,从布局到具体样式说明,最后定义CSS变量,自定义主题颜色。布局...
- 你的自我界限够强大吗?_你的自我界限够强大吗英文
-
我的结果:A、该设立新的界限...
- 行内元素与块级元素,以及区别_行内元素和块级元素有什么区别?
-
行内元素与块级元素首先,CSS规范规定,每个元素都有display属性,确定该元素的类型,每个元素都有默认的display值,分别为块级(block)、行内(inline)。块级元素:(以下列举比较常...
-
- 让“成都速度”跑得潇潇洒洒,地上地下共享轨交繁华
-
去年的两会期间,习近平总书记在参加人大会议四川代表团审议时,对治蜀兴川提出了明确要求,指明了前行方向,并带来了“祝四川人民的生活越来越安逸”的美好祝福。又是一年...
-
2025-02-21 16:00 yuyutoo
- 今年国家综合性消防救援队伍计划招录消防员15000名
-
记者24日从应急管理部获悉,国家综合性消防救援队伍2023年消防员招录工作已正式启动。今年共计划招录消防员15000名,其中高校应届毕业生5000名、退役士兵5000名、社会青年5000名。本次招录的...
- 一起盘点最新 Chrome v133 的5大主流特性 ?
-
1.CSS的高级attr()方法CSSattr()函数是CSSLevel5中用于检索DOM元素的属性值并将其用于CSS属性值,类似于var()函数替换自定义属性值的方式。...
- 竞走团体世锦赛5月太仓举行 世界冠军杨家玉担任形象大使
-
style="text-align:center;"data-mce-style="text-align:...
- 学物理能做什么?_学物理能做什么 卢昌海
-
作者:曹则贤中国科学院物理研究所原标题:《物理学:ASourceofPowerforMan》在2006年中央电视台《对话》栏目的某期节目中,主持人问过我一个的问题:“学物理的人,如果日后不...
-
- 你不知道的关于这只眯眼兔的6个小秘密
-
在你们忙着给熊本君做表情包的时候,要知道,最先在网络上引起轰动的可是这只脸上只有两条缝的兔子——兔斯基。今年,它更是迎来了自己的10岁生日。①关于德艺双馨“老艺...
-
2025-02-21 16:00 yuyutoo
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)