百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理

yuyutoo 2024-10-16 15:45 2 浏览 0 评论

CSDN地址:https://blog.csdn.net/Eclipse_2019/article/details/126362657

学习目标

  1. 理清Zookeeper的Session创建、刷新和过期流程分析
  2. 明确Zookeeper的核心业务调用链

第1章 Session创建

上文给大家讲过Zookeeper的应用了,实际上Zookeeper分为两个模块,server端和client端,server端实现了所有的zookeeper业务逻辑,而client端就是封装了server端的一些方法调用。既然存在两个模块,那肯定涉及到了网络通信,ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现;使用ServerCnxn代表一个客户端与服务端的连接。从单机版启动中可以发现Zookeeper默认通信组件为NIOServerCnxnFactory。接下来我们先来看看Zookeeper中通信的流程图,然后再详细分析源码。

客户端发起连接请求及服务端建立会话的全流程图由于平台显示不清晰,详图请参考CSDN上的图,在这里就不发上来了

1.1 客户端发送请求

上文中已经讲过,建立连接是通过new ZooKeeper方法完成的,在ZooKeeper的构造方法中会创建一个ClientCnxn对象,并调用该对象的start方法,在该方法中会启动两个线程任务:sendThread和eventThread。

而sendThread线程就是我们去建立连接的核心线程,在该线程的run方法中实际上是通过一个while循环,不断的执行,如果是第一次进来会去创建连接,如果连接状态是CONNECTED的话,则会最大不超过10秒去发送一次Ping请求保证连接不断开。

源码比较长,有些不重要的代码就直接省略了。

public void run() {
    //发送Ping的间隔
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            //如果状态是CONNECTING的话就去创建连接
            if (!clientCnxnSocket.isConnected()) {
                startConnect(serverAddress);
            }

			//如果已经连接成功,则最大不超过10秒发送一次心跳
            if (state.isConnected()) {
                //这段逻辑实际上就是控制心跳的是发送间隔,避免过多的发送
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }
        } 
}
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
        //会调用ZK的服务端完成会话创建
        registerAndConnect(sock, addr);
    } catch (IOException e) {

    }
}

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    //调用NIO开启会话
    boolean immediateConnect = sock.connect(addr);

}

1.2 服务端接收连接

服务端由NIOServerCnxnFactory启动线程去接收请求,NIOServerCnxnFactory启动时会启动四类线程:

AcceptThread:该线程接收来自客户端的连接,并将其分配给SelectorThread(启动一个线程)。

SelectorThread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个SelectorThread,使用系统属性zookeeper.nio.numSelectorThreads配置该类线程数,默认个数为 核心数/2。

WorkerThread:该线程执行基本的套接字读写,使用系统属性zookeeper.nio.numWorkerThreads配置该类线程数,默认为核心数?2核心数?2.如果该类线程数为0,则另外启动一线程进行IO处理,见下文worker thread介绍。

ConnectionExpirationThread:若连接上的session已过期,则关闭该连接。

1.2.1 AcceptThread

该线程会接收客户端的请求

public void run() {
    while (!stopped && !acceptSocket.socket().isClosed()) {
        select();
    }
}
private void select() {
    try {
        //查找就绪的连接
        selector.select();

        Iterator<SelectionKey> selectedKeys =
            selector.selectedKeys().iterator();
        while (!stopped && selectedKeys.hasNext()) {

            if (key.isAcceptable()) {
                //1:和当前服务建立链接。
                //2:获取远程客户端计算机地址信息。
                //3:判断当前链接是否超出最大限制。
                //4:调整为非阻塞模式。
                //5:轮询获取一个SelectorThread,将当前链接分配给该SelectorThread。
                //6:将当前请求添加到该SelectorThread的acceptedQueue中,并唤醒该SelectorThread。
                if (!doAccept()) {
                    pauseAccept(10);
                }
            } 
        }
    } 
}

进入到doAccept方法中

private boolean doAccept() {
			...
            try {
                //建立连接
                sc = acceptSocket.accept();
                accepted = true;
                //获取远程计算机地址信息
                InetAddress ia = sc.socket().getInetAddress();
                int cnxncount = getClientCnxnCount(ia);

                //判断是否超出最大客户端连接的限制
                if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                    ...
                }

                LOG.debug("Accepted socket connection from "
                         + sc.socket().getRemoteSocketAddress());
                //调整此通道的阻塞模式
                sc.configureBlocking(false);

                //轮询将此连接分配给一个SelectorThread
                if (!selectorIterator.hasNext()) {
                    selectorIterator = selectorThreads.iterator();
                }
                SelectorThread selectorThread = selectorIterator.next();
                //将新连接加入SelectorThread的acceptedQueue中,并唤醒SelectorThread
                if (!selectorThread.addAcceptedConnection(sc)) {
                    ...
                }
                acceptErrorLogger.flush();
            } catch (IOException e) {
				...
            }
            return accepted;
        }
    }
public boolean addAcceptedConnection(SocketChannel accepted) {
    //将accepted添加到acceptedQueue
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    //唤醒SelectorThread
    wakeupSelector();
    return true;
}

在addAcceptedConnection方法中会唤醒SelectorThread,所以,接下来,逻辑会进入到SelectorThread.run方法中

1.2.2 SelectorThread

该线程的主要作用是从Socket读取数据,并封装成workRequest,并将workRequest交给workerPool工作线程池处理,同时将acceptedQueue中未处理的连接取出,并未每个连接绑定OP_READ读事件,并封装对应的上下文对象NIOServerCnxn。SelectorThread的run方法如下:

public void run() {

    //读取就绪的IO事件,交由worker thread处理,在ZookeeperServer的processPacket()中处理数据
    select();
    //把acceptedQueue队列中接收的连接,取出来注册OP_READ事件,
    //并添加NIOServerCnxn对象与当前key绑定
    //相当于给每个连接添加附加对象NIOServerCnxn(上下文对象)
    processAcceptedConnections();
    //遍历所有updateQueue,更新updateQueue中连接的监听事件
    processInterestOpsUpdateRequests();
}

先来看看processAcceptedConnections方法,该方法中会为每个连接创建一个NIOServerCnxn对象,同时也会调用服务续约的逻辑

private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        key = accepted.register(selector, SelectionKey.OP_READ);
        // 针对每个连接,创建一个NIOServerCnxn
        NIOServerCnxn cnxn = createConnection(accepted, key, this);
        key.attach(cnxn);
        addCnxn(cnxn);
    }
}

这块不是很重要,我们不往深挖,接着回去看select方法

private void select() {
    selector.select();
    Set<SelectionKey> selected = selector.selectedKeys();
    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
    Collections.shuffle(selectedList);
    Iterator<SelectionKey> selectedKeys = selectedList.iterator();
    while (!stopped && selectedKeys.hasNext()) {
        SelectionKey key = selectedKeys.next();
        selected.remove(key);
        if (key.isReadable() || key.isWritable()) {
            //核心逻辑
            handleIO(key);
        }
    }
}

handleIO()方法会封装当前SelectorThread为IOWorkRequest,并将IOWorkRequest交给workerPool来调度,而workerPool调度才是读数据的开始,源码如下:

private void handleIO(SelectionKey key) {
    //将SelectorThread封装成workRequest对象
    IOWorkRequest workRequest = new IOWorkRequest(this, key);
 	//处理服务续约的方法
    touchCnxn(cnxn);
    //将封装好的workRequest交给线程池去处理,在这里读取客户端数据
    workerPool.schedule(workRequest);
}

我们先来看看处理续约的方法,不只是在这里调用了NIOServerCnxnFactory.touchCnxn(NIOServerCnxn)方法。

public void touchCnxn(NIOServerCnxn cnxn) {
    cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
}

进入到update方法中,会发现是ExpiryQueue中的一个方法,从名字上能看出来,ExpiryQueue实际上就是服务端管理session过期的队列

// 维护每个NIOServerCnxn对应的过期时间
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
// 维护每个过期时间对应的桶里有哪些NIOServerCnxn
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
private final AtomicLong nextExpirationTime = new AtomicLong();

public Long update(E elem, int timeout) {
    Long prevExpiryTime = elemMap.get(elem);//获取当前NIOServerCnxn对应的过期时间
    long now = Time.currentElapsedTime();
    Long newExpiryTime = roundToNextInterval(now + timeout);//获取下次过期时间
    if (newExpiryTime.equals(prevExpiryTime)) {
        return null; // No change, so nothing to update
    }
    // First add the elem to the new expiry time bucket in expiryMap.
    Set<E> set = expiryMap.get(newExpiryTime); //拿到下一个过期时间的桶
    if (set == null) {
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        // Put the new set in the map, but only if another thread hasn't beaten us to it
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    set.add(elem); //把原来的NIOServerCnxn移动到新的桶里

    // Map the elem to the new expiry time. If a different previous
    // mapping was present, clean up the previous expiry bucket.
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            prevSet.remove(elem); //清空之前过期的桶
        }
    }
    return newExpiryTime;
}

ok,简单了解了过期时间的更新,我们在回到之前讲的通过工作线程池去处理workRequest对象读取客户端数据的流程

1.2.3 WorkerThread

WorkerThread相比上面的线程而言,调用关系颇为复杂,设计到了多个对象方法调用,主要用于处理IO,但并未对数据做出处理,数据处理将有业务链对象RequestProcessor处理,调用关系图如下:

public void schedule(WorkRequest workRequest) {
    schedule(workRequest, 0);
}
public void schedule(WorkRequest workRequest, long id) {
    ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
    int size = workers.size();
    int workerNum = ((int) (id % size) + size) % size;
    ExecutorService worker = workers.get(workerNum);
    worker.execute(scheduledWorkRequest);
}
WorkerService.ScheduledWorkRequest

private class ScheduledWorkRequest implements Runnable {
    @Override
    public void run() {
        //IOWorkRequest.doWork
        workRequest.doWork();
    }
}
private class IOWorkRequest extends WorkerService.WorkRequest {
    public void doWork() throws InterruptedException {
        if (key.isReadable() || key.isWritable()) {
            //执行IO数据处理
            cnxn.doIO(key);
            //再次见到这个方法,做服务续约的
            touchCnxn(cnxn);
        }
    }
}

后面的一些细节我们就不展开了,通过doIO方法最终会调用到readPayload。

private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from client sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely client has closed socket");
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        //第一次未初始化时,读取连接请求
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。

private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    //下次进来就不会再来创建了
    initialized = true;
}

上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();//创建连接请求
    connReq.deserialize(bia, "connect"); //反序列化连接请求参数
    long sessionId = connReq.getSessionId(); //创建一个sessionId
    int sessionTimeout = connReq.getTimeOut();
    byte[] passwd = connReq.getPasswd();
    cnxn.setSessionTimeout(sessionTimeout);
    if (sessionId == 0) {
        long id = createSession(cnxn, passwd, sessionTimeout); //创建session
    }
}

创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:

long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    if (passwd == null) {
        // Possible since it's just deserialized from a packet on the wire.
        passwd = new byte[0];
    }
    //sessionTracker去创建一个sessionId
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    //创建一个OpCode.createSession请求(根据SessionId提交一个创建会话的业务)
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
    setLocalSessionFlag(si);
    //提交业务
    submitRequest(si);
    return sessionId;
}

上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:

public long createSession(int sessionTimeout) {
    //获取下一个SessionId
    long sessionId = nextSessionId.getAndIncrement();
    //Session跟踪配置
    addSession(sessionId, sessionTimeout);
    return sessionId;
}

会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,addSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:

public synchronized boolean addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);

    boolean added = false;
    //获取一个Session,如果为空,则以SessionId创建一个Session
    SessionImpl session = sessionsById.get(id);
    if (session == null){
        session = new SessionImpl(id, sessionTimeout);
    }

    // findbugs2.0.3 complains about get after put.
    // long term strategy would be use computeIfAbsent after JDK 1.8
    //Session存入到sessionById中,可以根据ID获取到Session
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);

    if (existedSession != null) {
        session = existedSession;
    } else {
        added = true;
        LOG.debug("Adding session 0x" + Long.toHexString(id));
    }

    if (LOG.isTraceEnabled()) {
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                 "SessionTrackerImpl --- " + actionStr + " session 0x"
                                 + Long.toHexString(id) + " " + sessionTimeout);
    }

    //将Session添加到失效队列中
    updateSessionExpiry(session, sessionTimeout);
    return added;
}

第2章 Session刷新

也可以叫服务续约,客户端除了PING请求以外,其他正常的CRUD请求也会对session续约,这里以PING请求为例

ClientCnxn.SendThread

public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    while (state.isAlive()) {
        //如果连接建立,每隔段时间发送PING请求
        if (state.isConnected()) {
            //1000(1 second) is to prevent race condition missing to send the second ping
            //also make sure not to send too many pings when readTimeout is small
            int timeToNextPing = readTimeout / 2
                                 - clientCnxnSocket.getIdleSend()
                                 - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
            //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
            if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                //发送PING请求
                sendPing();
                clientCnxnSocket.updateLastSend();
            } else {
                if (timeToNextPing < to) {
                    to = timeToNextPing;
                }
            }
        }
    }
}

发送PING请求给服务端

private void sendPing() {
    lastPingSentNs = System.nanoTime();
    RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
    queuePacket(h, null, null, null, null, null, null, null, null);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
    ...
    //这段逻辑实际上就是唤醒一个sendThread线程,其实再去调用一下sendThread.run方法,在这个方法里面会重新发请求到服务端
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}
//重复执行sendThread.run
public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    //发送请求
    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                 ClientCnxn cnxn)
    throws IOException, InterruptedException {
   
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                //发送PING
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            if (findSendablePacket(outgoingQueue,
                                   cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}

服务端会收到客户端的PING请求,同样也是AcceptedThread接收请求,然后执行的逻辑同Session的创建流程,最后进入到了SelectorThread.run——>select()——>handleIO(key)——>touchCnxn(cnxn)。

第3章 Session过期

通过Session创建的源码分析其实大家应该也能看出来,对于Session的过期属性的管理的是SessionTrackerImpl这个类,而它也是一个线程类,继承了 ZooKeeperCriticalThread ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭。

public void run() {
    try {
        while (running) {
            //获取下一个失效时间
            long waitTime = sessionExpiryQueue.getWaitTime();
            if (waitTime > 0) {
                //休眠
                Thread.sleep(waitTime);
                continue;
            }
            //获取失效的客户端会话集合
            for (SessionImpl s : sessionExpiryQueue.poll()) {
                //把Session会话的 isClosing 状态设置为了true
                setSessionClosing(s.sessionId);
                //让客户端会话失效
                expirer.expire(s);
            }
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

让客户端失效的方法 expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession 的请求,并交给业务链处理,我们查看 ZooKeeperServer.expire() 方法源码如下:

public void expire(Session session) {
    long sessionId = session.getSessionId();
    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
             + ", timeout of " + session.getTimeout() + "ms exceeded");
    close(sessionId);
}
private void close(long sessionId) {
    //创建一个OpCode.closeSession业务请求
    Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
    setLocalSessionFlag(si);
    //提交给业务链处理
    submitRequest(si);
}

道理发现又出现了submitRequest方法,这里暂时先不讲,在下文的请求处理中再详细介绍,在这里只需要知道,我们会调用该方法将我们的session关闭就好了。

第4章 请求处理

zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;

4.1 RequestProcessor结构

客户端请求过来,每次执行不同事务操作的时候,Zookeeper也提供了一套业务处理流程RequestProcessor。

我们来看一下RequestProcessor初始化流程,ZooKeeperServer.setupRequestProcessors()方法源码如下:

/**
 * 初始化业务处理流程
 */
protected void setupRequestProcessors() {
    //创建FinalRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    //创建SyncRequestProcessor,并将FinalProcessor作为它下一个业务链
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                                                              finalProcessor);
    //启动syncProcessor
    ((SyncRequestProcessor)syncProcessor).start();
    //创建PrepRequestProcessor,并作为第一个处理业务的RequestProcessor,将syncProcessor作为它的下一个业务链
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    //启动firstProcessor
    ((PrepRequestProcessor)firstProcessor).start();
}

syncProcessor创建时,将finalProcessor作为参数传递进来源码如下:

/**
 * 创建SyncRequestProcessor,下一个责任链 FinalRequestProcessor
 * @param zks
 * @param nextProcessor
 */
public SyncRequestProcessor(ZooKeeperServer zks,
                            RequestProcessor nextProcessor) {
    super("SyncThread:" + zks.getServerId(), zks
          .getZooKeeperServerListener());
    this.zks = zks;
    //下一个责任链
    this.nextProcessor = nextProcessor;
    running = true;
}

firstProcessor创建时,将syncProcessor作为参数传递进来源码如下:

public PrepRequestProcessor(ZooKeeperServer zks,
                            RequestProcessor nextProcessor) {
    super("ProcessThread(sid:" + zks.getServerId() + " cport:"
          + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
    this.nextProcessor = nextProcessor;
    this.zks = zks;
}

PrepRequestProcessor/SyncRequestProcessor关系图:

PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。

4.2 PrepRequestProcessor

PrepRequestProcessor是请求处理器的第1个处理器,我们把之前的请求业务处理衔接起来,一步一步分析。ZooKeeperServer.processPacket()>submitRequest()>enqueueRequest()>RequestThrottler.submitRequest() ,我们来看下RequestThrottler.submitRequest()源码,它将当前请求添加到submittedRequests队列中了,源码如下:

在submitRequest中会执行firstProcessor.processRequest方法,会进入到PrepRequestProcessor.processRequest(request)

public void processRequest(Request request) {
    submittedRequests.add(request);
}

public void run() {
    while (true) {
        Request request = submittedRequests.take();
        pRequest(request);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);
    switch (request.type) {
    case OpCode.createSession: //针对连接请求做处理
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
        }
        break;
    }
}

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
    switch (type) {
    case OpCode.createSession:
        int to = request.request.getInt();
        request.setTxn(new CreateSessionTxn(to));
        zks.sessionTracker.trackSession(request.sessionId, to);
        zks.setOwner(request.sessionId, request.getOwner());
        break;
    }
}

从代码可以看出pRequest2Txn()方法主要做了权限校验、快照记录、事务信息记录相关的事,还并未涉及数据处理,也就是说PrepRequestProcessor其实是做了操作前权限校验、快照记录、事务信息记录相关的事。

4.3 SyncRequestProcessor

分析了PrepRequestProcessor处理器后,接着来分析SyncRequestProcessor,该处理器主要是将请求数据高效率存入磁盘,并且请求在写入磁盘之前是不会被转发到下个处理器的。

我们先看请求被添加到队列的方法:

public void processRequest(Request request) {
    // request.addRQRec(">sync");
    //将请求添加到queueRequest队列中
    queuedRequests.add(request);
}

同样SyncRequestProcessor是一个线程,执行队列中的请求也在线程中触发,我们看它的run方法,源码如下:

public void run() {
    try {
        int logCount = 0;

        // we do this in an attempt to ensure that not all of the servers
        // in the ensemble take a snapshot at the same time
        int randRoll = r.nextInt(snapCount/2);
        while (true) {
            Request si = null;
            if (toFlush.isEmpty()) {
                //阻塞方法获取一个请求
                si = queuedRequests.take();
            } else {
                si = queuedRequests.poll();
                if (si == null) {
                    flush(toFlush);
                    continue;
                }
            }
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                // track the number of records written to the log
                if (zks.getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r.nextInt(snapCount/2);
                        // roll the log
                        //重置上次rollLog以来的txn数量
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        //保存快照数据
                                        zks.takeSnapshot();
                                    } catch(Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    }
                                }
                            };
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
                //将当前请求添加到toFlush队列中,toFlush队列是已经写入并等待刷新到磁盘的事务
                toFlush.add(si);
                if (toFlush.size() > 1000) {
                    //提交数据
                    flush(toFlush);
                }
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
    } finally{
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

run方法会从queuedRequests队列中获取一个请求,如果获取不到就会阻塞等待直到获取到一个请求对象,程序才会继续往下执行,接下来会调用Snapshot Thread线程实现将客户端发送的数据以快照的方式写入磁盘,最终调用flush()方法实现数据提交,flush()方法源码如下:

private void flush(LinkedList<Request> toFlush)
    throws IOException, RequestProcessorException
{
    if (toFlush.isEmpty())
        return;
    //数据提交
    zks.getZKDatabase().commit();
    while (!toFlush.isEmpty()) {
        Request i = toFlush.remove();
        if (nextProcessor != null) {
            //调用下一个业务链
            nextProcessor.processRequest(i);
        }
    }
    if (nextProcessor != null && nextProcessor instanceof Flushable) {
        ((Flushable)nextProcessor).flush();
    }
}

flush()方法实现了数据提交,并且会将请求交给下一个业务链,下一个业务链为FinalRequestProcessor

4.4 FinalRequestProcessor

前面分析了SyncReqeustProcessor,接着分析请求处理链中最后的一个处理器FinalRequestProcessor,该业务处理对象主要用于返回Response。

在SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。

public void processRequest(Request request) {
    ProcessTxnResult rc = zks.processTxn(request);
    switch (request.type) {
        case OpCode.createSession: {
            lastOp = "SESS";
            updateStats(request, lastOp, lastZxid);
            zks.finishSessionInit(request.cnxn, true);
            return;
        }
    }
    if (path == null || rsp == null) {
        cnxn.sendResponse(hdr, rsp, "response"); //服务端将请求返回,这时客户端会收到服务端响应
    }
}

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    // register with JMX
    if (valid) {
        if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
            serverCnxnFactory.registerConnection(cnxn);
        } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
            secureServerCnxnFactory.registerConnection(cnxn);
        }
    }
}

调用sendResponse方法后会将请求信息返回给客户端

客户端收到服务端响应

ClientCnxnSocketNIO.doIO(Queue<Packet>, ClientCnxn)

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    if (sockKey.isReadable()) {
        if (!initialized) {
            readConnectResult();
        }
    }
}

void readConnectResult() throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    conRsp.deserialize(bbia, "connect");
    this.sessionId = conRsp.getSessionId(); //连接建立完成
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}

下文预告

  1. 理解Zookeeper的watcher机制原理

相关推荐

jQuery VS AngularJS 你更钟爱哪个?

在这一次的Web开发教程中,我会尽力解答有关于jQuery和AngularJS的两个非常常见的问题,即jQuery和AngularJS之间的区别是什么?也就是说jQueryVSAngularJS?...

Jquery实时校验,指定长度的「负小数」,小数位未满末尾补0

在可以输入【负小数】的输入框获取到焦点时,移除千位分隔符,在输入数据时,实时校验输入内容是否正确,失去焦点后,添加千位分隔符格式化数字。同时小数位未满时末尾补0。HTML代码...

如何在pbootCMS前台调用自定义表单?pbootCMS自定义调用代码示例

要在pbootCMS前台调用自定义表单,您需要在后台创建表单并为其添加字段,然后在前台模板文件中添加相关代码,如提交按钮和表单验证代码。您还可以自定义表单数据的存储位置、添加文件上传字段、日期选择器、...

编程技巧:Jquery实时验证,指定长度的「负小数」

为了保障【负小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【负小数】的方法。HTML代码<inputtype="text"class="forc...

一篇文章带你用jquery mobile设计颜色拾取器

【一、项目背景】现实生活中,我们经常会遇到配色的问题,这个时候去百度一下RGB表。而RGB表只提供相对于的颜色的RGB值而没有可以验证的模块。我们可以通过jquerymobile去设计颜色的拾取器...

编程技巧:Jquery实时验证,指定长度的「正小数」

为了保障【正小数】的正确性,做成了通过Jquery,在用户端,实时验证指定长度的【正小数】的方法。HTML做成方法<inputtype="text"class="fo...

jquery.validate检查数组全部验证

问题:html中有多个name[],每个参数都要进行验证是否为空,这个时候直接用required:true话,不能全部验证,只要这个数组中有一个有值就可以通过的。解决方法使用addmethod...

Vue进阶(幺叁肆):npm查看包版本信息

第一种方式npmviewjqueryversions这种方式可以查看npm服务器上所有的...

layui中使用lay-verify进行条件校验

一、layui的校验很简单,主要有以下步骤:1.在form表单内加上class="layui-form"2.在提交按钮上加上lay-submit3.在想要校验的标签,加上lay-...

jQuery是什么?如何使用? jquery是什么功能组件

jQuery于2006年1月由JohnResig在BarCampNYC首次发布。它目前由TimmyWilson领导,并由一组开发人员维护。jQuery是一个JavaScript库,它简化了客户...

django框架的表单form的理解和用法-9

表单呈现...

jquery对上传文件的检测判断 jquery实现文件上传

总体思路:在前端使用jquery对上传文件做部分初步的判断,验证通过的文件利用ajaxFileUpload上传到服务器端,并将文件的存储路径保存到数据库。<asp:FileUploadI...

Nodejs之MEAN栈开发(四)-- form验证及图片上传

这一节增加推荐图书的提交和删除功能,来学习node的form提交以及node的图片上传功能。开始之前需要源码同学可以先在git上fork:https://github.com/stoneniqiu/R...

大数据开发基础之JAVA jquery 大数据java实战

上一篇我们讲解了JAVAscript的基础知识、特点及基本语法以及组成及基本用途,本期就给大家带来了JAVAweb的第二个知识点jquery,大数据开发基础之JAVAjquery,这是本篇文章的主要...

推荐四个开源的jQuery可视化表单设计器

jquery开源在线表单拖拉设计器formBuilder(推荐)jQueryformBuilder是一个开源的WEB在线html表单设计器,开发人员可以通过拖拉实现一个可视化的表单。支持表单常用控件...

取消回复欢迎 发表评论: