zookeeper單機(jī)版節(jié)點創(chuàng)建源碼分析

前言

我們在前面介紹zookeeper server和client端的相關(guān)源碼
zookeeper client 啟動源碼分析
zookeeper server 啟動源碼分析
現(xiàn)在我們分析下創(chuàng)建一個節(jié)點的過程

客戶端創(chuàng)建節(jié)點請求

我們分析zookeeper.create方法源代碼,客戶端節(jié)點創(chuàng)建請求在這個方法中形成

public void create(
        final String path,
        byte[] data,
        List<ACL> acl,
        CreateMode createMode,
        Create2Callback cb,
        Object ctx,
        long ttl) {
        //path是用戶設(shè)置的節(jié)點路徑
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());
        EphemeralType.validateTTL(createMode, ttl);
      //如何設(shè)置了chrootPath固惯,那么根據(jù)chrootPath和clientPath形成最終節(jié)點在服務(wù)端的路徑
        final String serverPath = prependChroot(clientPath);
        //生成請求頭
        RequestHeader h = new RequestHeader();
       //設(shè)置請求頭的type信息懈息,請求頭不同的type信息表上不同的請求,將來服務(wù)端就是根據(jù)這個type信息來決定做如何處理
        setCreateHeader(createMode, h);
       //生成返回消息頭對象
        ReplyHeader r = new ReplyHeader();
         //創(chuàng)建返回響應(yīng)對象
        Create2Response response = new Create2Response();
       //創(chuàng)建請求消息體乞而,下面解析
        Record record = makeCreateRecord(createMode, serverPath, data, acl, ttl);
       //通過客戶端端連接對象把請求消息發(fā)送出去
        cnxn.queuePacket(h, r, record, response, cb, clientPath, serverPath, ctx, null);
    }
makeCreateRecord

創(chuàng)建新建節(jié)點請求體

private Record makeCreateRecord(CreateMode createMode, String serverPath, byte[] data, List<ACL> acl, long ttl) {
        Record record;
        if (createMode.isTTL()) {
            //下面是新建TTL類型的節(jié)點操作邏輯
            CreateTTLRequest request = new CreateTTLRequest();
            request.setData(data);
            request.setFlags(createMode.toFlag());
            request.setPath(serverPath);
            request.setAcl(acl);
            request.setTtl(ttl);
            record = request;
        } else {
         //CreateRequest就是創(chuàng)建一個新建節(jié)點請求
            CreateRequest request = new CreateRequest();
           //設(shè)置節(jié)點數(shù)據(jù)
            request.setData(data);
           //設(shè)置節(jié)點的類型(持久化箩退,順序節(jié)點丑罪,container...)
            request.setFlags(createMode.toFlag());
          //設(shè)置節(jié)點的路徑
            request.setPath(serverPath);
          //設(shè)置節(jié)點的acl
            request.setAcl(acl)
            record = request;
        }
        return record;
    }

我們繼續(xù)分析客戶端如何將請求發(fā)送服務(wù)端

cnxn.queuePacket
 public Packet queuePacket(
        RequestHeader h,
        ReplyHeader r,
        Record request,
        Record response,
        AsyncCallback cb,
        String clientPath,
        String serverPath,
        Object ctx,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
       //生成新建節(jié)點請求Packet對象
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        // The synchronized block here is for two purpose:
        // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
        // 2. synchronized against each packet. So if a closeSession packet is added,
        // later packet will be notified.
        synchronized (state) {
            if (!state.isAlive() || closing) {
         //如果客戶端和服務(wù)端連接已經(jīng)關(guān)閉,那么通知客戶具體情況
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //把請求消息放入到outgoingQueue中等待發(fā)送到服務(wù)端
                outgoingQueue.add(packet);
            }
        }
       //叫醒sendThread杰扫,讓其從outgoingQueue繼續(xù)處理消息
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }

當(dāng)sendThread從selector.select醒來之后队寇,會執(zhí)行doIO操作,在doIO中新建節(jié)點請求對象packet會被設(shè)置xid章姓,然后會被轉(zhuǎn)化成ByteBuffer對象通過socket發(fā)送給服務(wù)端


client_process.png

服務(wù)端的處理

服務(wù)端的處理流程如下圖

server_process.png

SelectorThread.handleIO方法我們已經(jīng)在zookeeper server 啟動源碼分析中解析了佳遣,接下來我們就一步一步的解析每一個流程的源代碼

IOWorkRequest.doWork

當(dāng)客戶端發(fā)送的請求到達(dá)服務(wù)端的時候,SelectorThread會獲取到對應(yīng)的事件監(jiān)聽 selectionKey凡伊,selectionKey會被包裝成IOWorkRequest提交給io worker線程池零渐,線程池會分配一個線程給對應(yīng)的IOWorkRequest用來執(zhí)行IOWorkRequest的doWork方法,我們先分析下doWork方法

 public void doWork() throws InterruptedException {
            if (!key.isValid()) {
             //如果selectionKey不合法系忙,那么把它cancel掉
                selectorThread.cleanupSelectionKey(key);
                return;
            }
             //如果是讀寫類型的io那么執(zhí)行doIO
            if (key.isReadable() || key.isWritable()) {
                cnxn.doIO(key);

                // Check if we shutdown or doIO() closed this connection
                if (stopped) {
                    //如果服務(wù)端停止了诵盼,那么通過close方法來處理關(guān)閉底層的socket,處理應(yīng)用層的會話等等
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                    return;
                }
                if (!key.isValid()) {
                    selectorThread.cleanupSelectionKey(key);
                    return;
                }
              //更新連接的最新狀態(tài)银还,這個狀態(tài)會被ConnectionManager使用
                touchCnxn(cnxn);
            }

            // Mark this connection as once again ready for selection
           //設(shè)置連接狀態(tài)為可讀
            cnxn.enableSelectable();
            // Push an update request on the queue to resume selecting
            // on the current set of interest ops, which may have changed
            // as a result of the I/O operations we just performed.
            //這里就是喚醒selectorThread风宁,可以繼續(xù)干活了
            if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
            }
        }
NioServerCnxn.doIO
void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (!isSocketOpen()) {
                LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) {
               //處理讀io事件
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    handleFailedRead();
                }
               //如果incomingBuffer已經(jīng)讀滿了數(shù)據(jù)
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    if (incomingBuffer == lenBuffer) { // start of next request
                       //incomingBuffer這個時候存儲的是請求消息的長度
                        incomingBuffer.flip();
                        isPayload = readLength(k);
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                       //讀取請求消息體
                        readPayload();
                    } else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }
            if (k.isWritable()) {
             //處理寫事件,后面在response返回給client端會詳細(xì)解析
                handleWrite(k);

                if (!initialized && !getReadInterest() && !getWriteInterest()) {
                    throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
                }
            }
        } catch (CancelledKeyException e) {
            LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));

            LOG.debug("CancelledKeyException stack trace", e);

            close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
        } catch (CloseRequestException e) {
            // expecting close to log session closure
            close();
        } catch (EndOfStreamException e) {
            LOG.warn("Unexpected exception", e);
            // expecting close to log session closure
            close(e.getReason());
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.CLIENT_CNX_LIMIT);
        } catch (IOException e) {
            LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.IO_EXCEPTION);
        }
    }

NIOServerCnxn.readPayload

從socket中讀取請求的消息

private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
           //incomingBuffer沒有讀滿蛹疯,那么繼續(xù)讀
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                handleFailedRead();
            }
        }

        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            incomingBuffer.flip();
            packetReceived(4 + incomingBuffer.remaining());
            if (!initialized) {
               //如果連接還沒完成杀糯,那么說明這個請求是一個會話創(chuàng)建請求
                readConnectRequest();
            } else {
              //讀取請求消息體
                readRequest();
            }
            //重置lenBuffer,incomingBuffer為下次讀取數(shù)據(jù)做準(zhǔn)備
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

接下來就進(jìn)入請求消息的處理過程了
NIOServerCnxn.readRequest會直接調(diào)用zookeeperServer.processPacket

ZookeeperServer.processPacket

用來把請求消息流轉(zhuǎn)化成請求對象

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        //根據(jù)incomingBuffer創(chuàng)建輸入流對象
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
       //首先反序列化請求頭
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        //
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        //
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.
        //zk事物處理鏈具有控制流量的功能苍苞,這個地方就是增加正在處理的事物數(shù)和檢查是不是需要限流
        cnxn.incrOutstandingAndCheckThrottle(h);

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();‘
       //下面就是根據(jù)請求頭中的請求類型來做不同的處理邏輯
        if (h.getType() == OpCode.auth) {
            LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
           //消息體反序列化成認(rèn)證Packet
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            String scheme = authPacket.getScheme();
            //根據(jù)scheme獲取對應(yīng)的服務(wù)器認(rèn)證類
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if (ap != null) {
                try {
                    // handleAuthentication may close the connection, to allow the client to choose
                    // a different server to connect to.
                    //進(jìn)行認(rèn)證
                    authReturn = ap.handleAuthentication(
                        new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                        authPacket.getAuth());
                } catch (RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
                    authReturn = KeeperException.Code.AUTHFAILED;
                }
            }

            if (authReturn == KeeperException.Code.OK) {
              //認(rèn)證成功固翰,返回認(rèn)證成功結(jié)果給客戶端
                LOG.debug("Authentication succeeded for scheme: {}", scheme);
                LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn(
                        "No authentication provider for scheme: {} has {}",
                        scheme,
                        ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: {}", scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                 // ... and close connection
                 //認(rèn)證失敗,發(fā)送closeConn消息來關(guān)閉連接
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                //認(rèn)證失敗羹呵,服務(wù)端連接不再接受客戶端的任何請求
                cnxn.disableRecv();
            }
            return;
        } else if (h.getType() == OpCode.sasl) {
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
                ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
                cnxn.sendResponse(replyHeader, null, "response");
                cnxn.sendCloseSession();
                cnxn.disableRecv();
            } else {
           //創(chuàng)建請求消息對象request
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
               //如果請求特別的大(用戶可以設(shè)置一個閾值骂际,用來指定什么是大的請求)
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejected
                   //根據(jù)系統(tǒng)配置的能容忍的最大的最大請求數(shù)據(jù)量(100k),來決定是不是需要拒絕當(dāng)前的大請求
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
               //設(shè)置請求的owner
                si.setOwner(ServerCnxn.me);
              //提交請求給到requestThrottler
                submitRequest(si);
            }
        }
    }

上面經(jīng)過processPacket的處理之后歉铝,請求對象會進(jìn)入到請求處理鏈進(jìn)行處理太示,對于單機(jī)版本的zookeeper而言,所有的事物請求都會被請求處理鏈處理柠贤,zookeeper server 啟動源碼分析中對此有簡短的分析臼勉,

processor_chain.png

那么one by one 的來分析鏈上的這些處理器

RequestThrottler

作為處理器鏈上的第一個處理器它的作用是用來控制zookeeper處理事物的數(shù)量從而達(dá)到限流的目的包裝服務(wù)端不會過載,它作為一個單獨的線程運(yùn)行宴霸,因此我們看下的run 方法

public void run() {
        try {
            while (true) {
                if (killed) {
                    break;
                }
               //從請求隊列中取得一個請求
                Request request = submittedRequests.take();
                if (Request.requestOfDeath == request) {
                    break;
                }
   
                if (request.mustDrop()) {
                    continue;
                }

                // Throttling is disabled when maxRequests = 0
                if (maxRequests > 0) {
                    while (!killed) {
                        if (dropStaleRequests && request.isStale()) {
                            // Note: this will close the connection
                            dropRequest(request);
                            ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
                            request = null;
                            break;
                        }
                       //沒有達(dá)到限流的條件
                        if (zks.getInProcess() < maxRequests) {
                            break;
                        }
                      //如果目前在處理鏈上正在處理的請求數(shù)大于maxRequests驮瞧,RequestThrottler休眠一會
                        throttleSleep(stallTime);
                    }
                }
              //線程收到了停止工作的要求论笔,所以退出
                if (killed) {
                    break;
                }

                // A dropped stale request will be null
                if (request != null) {
                    if (request.isStale()) {
                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                    }
                   //把請求提交給處理鏈上的下一個處理器FirstRequestProcessor處理
                    zks.submitRequestNow(request);
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        int dropped = drainQueue();
        LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
    }
PreRequestProcessor

作為請求處理鏈上第一個做業(yè)務(wù)邏輯的處理器,它的內(nèi)部發(fā)生了什么呢?
這個處理器也是作為一個單獨的線程運(yùn)行所以我們看下的它的run方法

 public void run() {
        LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
        try {
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                //取得一個請求
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                    .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }

                request.prepStartTime = Time.currentElapsedTime();
           //pRequest是PreRequestProcessor邏輯處理的核心
                pRequest(request);
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

pRequest

pRequest代碼很長,主要就是把request根據(jù)請求頭的類型轉(zhuǎn)化成具體的請求蚯撩,我們主要分析新建節(jié)點的請求

protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        request.setHdr(null);
        request.setTxn(null);

        try {
            //根據(jù)請求的類型做不同的處理
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                  //創(chuàng)建新建節(jié)點請求對象
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;
            case OpCode.createTTL:
                CreateTTLRequest createTtlRequest = new CreateTTLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                break;
            case OpCode.deleteContainer:
            case OpCode.delete:
                DeleteRequest deleteRequest = new DeleteRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                break;
            case OpCode.setData:
                SetDataRequest setDataRequest = new SetDataRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
            case OpCode.reconfig:
                ReconfigRequest reconfigRequest = new ReconfigRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                break;
            case OpCode.setACL:
                SetACLRequest setAclRequest = new SetACLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                break;
            case OpCode.check:
                CheckVersionRequest checkRequest = new CheckVersionRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                break;
            case OpCode.multi:
                MultiOperationRecord multiRequest = new MultiOperationRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch (IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));

                for (Op op : multiRequest) {
                    Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    } else {
                        /* Prep the request and convert to a Txn */
                        try {
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = op.getType();
                            txn = request.getTxn();
                        } catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            if (e.code().intValue() > Code.APIERROR.intValue()) {
                                LOG.info("Got user-level KeeperException when processing {} aborting"
                                         + " remaining multi ops. Error Path:{} Error:{}",
                                         request.toString(),
                                         e.getPath(),
                                         e.getMessage());
                            }

                            request.setException(e);

                            /* Rollback change records from failed multi-op */
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    // TODO: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm
                    //       not sure how else to get the txn stored into our list.
                    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                        txn.serialize(boa, "request");
                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                        txns.add(new Txn(type, bb.array()));
                    }
                }

                request.setTxn(new MultiTxn(txns));
                if (digestEnabled) {
                    setTxnDigest(request);
                }

                break;

            //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                }
                break;

            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getAllChildrenNumber:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
            case OpCode.setWatches2:
            case OpCode.checkWatches:
            case OpCode.removeWatches:
            case OpCode.getEphemerals:
            case OpCode.multiRead:
            case OpCode.addWatch:
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                break;
            default:
                LOG.warn("unknown type {}", request.type);
                break;
            }
        } catch (KeeperException e) {
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(e.code().intValue()));
            }

            if (e.code().intValue() > Code.APIERROR.intValue()) {
                LOG.info(
                    "Got user-level KeeperException when processing {} Error Path:{} Error:{}",
                    request.toString(),
                    e.getPath(),
                    e.getMessage());
            }
            request.setException(e);
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process {}", request, e);

            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            if (bb != null) {
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
            } else {
                sb.append("request buffer is null");
            }

            LOG.error("Dumping request buffer: 0x{}", sb.toString());
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
            }
        }
      //別忘這里還有:設(shè)置請求的zxid
        request.zxid = zks.getZxid();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
       //交給下一個處理器SyncRequestProcessor處理
        nextProcessor.processRequest(request);
    }

我們分析下對于新建節(jié)點來說request是如何轉(zhuǎn)化成新建節(jié)點請求的揭芍,上面忘記說了称杨,具體的請求類型被轉(zhuǎn)化后存在Request對象的record屬性中

pRequest2Txn--》pRequest2TxnCreate

pRequest2Txn根據(jù)請求類型做不同的邏輯處理转质,因為代碼比較長我們就不分析了,搞懂了pRequest2TxnCreate,pRequest2Txn中的代碼自然也能很好的理解艰躺。我們分析pRequest2TxnCreate的源代碼,這個方法是用來處理新建新節(jié)點請求

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
        if (deserialize) {
          //把ByteBuffer反序列化成具體的請求對象record
            ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            //createTTL類型的請求
            CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();
        } else {
           //新建節(jié)點類型的請求
            CreateRequest createRequest = (CreateRequest) record;
           //下面設(shè)置新建節(jié)點的幾個參數(shù)(flags:節(jié)點類型,path:節(jié)點路徑闰蚕,acl:節(jié)點的訪問控制,data:節(jié)點的值)
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
       //獲取父節(jié)點的路徑
        String parentPath = validatePathForCreate(path, request.sessionId);
        //獲取acl信息
        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
       //獲取父節(jié)點的狀態(tài)信息,ChangeRecord是事物處理鏈用來跟蹤節(jié)點狀態(tài)信息的對象埃儿,在事物處理完成之后,對應(yīng)節(jié)點以及父節(jié)點的ChangeRecord也會被清理掉
        ChangeRecord parentRecord = getRecordForPath(parentPath);
         //檢查父節(jié)點設(shè)置的acl是否允許當(dāng)前節(jié)點去添加
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
        //獲取父節(jié)點的cversion,父節(jié)點的cversion就是zookeeper用來實現(xiàn)順序節(jié)點的關(guān)鍵所在
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) {
            //如果是順序型的節(jié)點悯衬,那么根據(jù)cversion形成新的path的名稱
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        try {
            //獲取當(dāng)前新加節(jié)點的ChangeRecord策橘,正常來說如果節(jié)點不存在拋出NoNodeException,表示可以正常添加沛婴,
          //如何節(jié)點在服務(wù)端已經(jīng)存在,拋出NodeExistsException丑婿,告知用戶節(jié)點不能重復(fù)創(chuàng)建
            if (getRecordForPath(path) != null) {
                throw new KeeperException.NodeExistsException(path);
            }
        } catch (KeeperException.NoNodeException e) {
            
            // ignore this one
        }
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
   //如果父節(jié)點是瞬時類型的節(jié)點约计,那么不能添加子節(jié)點煤蚌,給客戶端拋出相應(yīng)的異常
        if (ephemeralParent) {
            throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
       //更新父節(jié)點的cversion
        int newCversion = parentRecord.stat.getCversion() + 1;
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
           //生成節(jié)點的信息體結(jié)構(gòu):路徑,數(shù)據(jù),acl信息沽瘦,節(jié)點屬性,cversion
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
        }

        TxnHeader hdr = request.getHdr();
        long ephemeralOwner = 0;
        if (createMode.isContainer()) {
            ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
        } else if (createMode.isTTL()) {
            ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
        } else if (createMode.isEphemeral()) {
         //如果節(jié)點類型是瞬時節(jié)點,那么設(shè)置ephemeralOwner=sessionId
            ephemeralOwner = request.sessionId;
        }
        //設(shè)置節(jié)點的狀態(tài)信息
        StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
       //更新父節(jié)點包含的子節(jié)點的個數(shù)
        parentRecord.childCount++;
       //更新父節(jié)點的cversion
        parentRecord.stat.setCversion(newCversion);
        //更新子節(jié)點的最新事物id
        parentRecord.stat.setPzxid(request.getHdr().getZxid());
        parentRecord.precalculatedDigest = precalculateDigest(
                DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
       //把父節(jié)點的更改信息加入到outstandingChanges隊列中
        addChangeRecord(parentRecord);
       //生成本節(jié)點的更改信息
        ChangeRecord nodeRecord = new ChangeRecord(
                request.getHdr().getZxid(), path, s, 0, listACL);
       //設(shè)置節(jié)點的data
        nodeRecord.data = data;
        nodeRecord.precalculatedDigest = precalculateDigest(
                DigestOpCode.ADD, path, nodeRecord.data, s);
        //生成摘要信息
        setTxnDigest(request, nodeRecord.precalculatedDigest);
       //把節(jié)點的更改信息加入到outstandingChanges隊列中
        addChangeRecord(nodeRecord);
    }

簡單總結(jié)下對于新建節(jié)點請求來說PreRequestProcessor發(fā)生了什么巍实?
PreRequestProcessor把用戶的請求反序列化成對應(yīng)的新建節(jié)點對象,然后根據(jù)子節(jié)點的路徑獲取到父節(jié)點的信息,然后更新父節(jié)點的狀態(tài)信息(父節(jié)點的子節(jié)點個數(shù)妹窖,父節(jié)點的Pzxid)并且加入到outstandingChanges隊列中,形成本節(jié)點的狀態(tài)信息并且加入到outstandingChanges中谒麦,接下來請求會被繼續(xù)傳遞給下一個SyncRequestProcessor處理。

SyncRequestProcessor

SyncRequestProcessor作為一個單獨的線程運(yùn)行,我們分析下它的run方法

 public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
           //為了防止zookeeper所有的server在同時做的快照臣咖,SyncRequestProcessor會通過resetSnapshotStats隨機(jī)生成randRoll和randSize
          //randRoll和randSize是zookeeper生成快照的判斷條件
            resetSnapshotStats();
        //初始化lastFlushTime
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
                //pollTime表示系統(tǒng)多久之后就應(yīng)該做快照操作了
                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
              //從消息隊列中獲取一個Request酣胀,如果等待了pollTime都沒有獲取到甚脉,那么直接返回null
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                //下面的注釋很棒牺氨,解析清楚了這段代碼的邏輯
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();
                    //flush之后從queuedRequests獲取請求的方式變成take,如果沒有請求就一直等待
                    si = queuedRequests.take();
                }
                if (si == REQUEST_OF_DEATH) {
                 //收到了一個停止本線程的請求
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
                //這里關(guān)于事物日志的處理邏輯孵淘,我們放在下面詳細(xì)解析
                if (zks.getZKDatabase().append(si)) {
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                      //更新日志的狀態(tài),刷到磁盤
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                        //啟動一個線程來做快照操作
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
               //把請求加入到flush隊列
                toFlush.add(si);
                if (shouldFlush()) {
                 //下面會詳細(xì)解析flush
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }
zkDatabase.append(si)

zkDatabase.append(si)發(fā)生了什么呢

  public boolean append(Request si) throws IOException {
        //更新內(nèi)存中事物的數(shù)量
        txnCount.incrementAndGet();
        return this.snapLog.append(si);
    }
snapLog.append()

FileTxnSnapLog.append會把請求交給log處理對象FileTxnLog處理

public boolean append(Request si) throws IOException {
        return txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest());
    }

FileTxnLog.append

一番波折終于來到了真正處理日志的地方毡庆,下面的代碼邏輯會把請求事物加入到log日志中

 public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
        if (hdr == null) {
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn(
                "Current zxid {} is <= {} for {}",
                hdr.getZxid(),
                lastZxidSeen,
                hdr.getType());
        } else {
         //設(shè)置lastZxidSeen為當(dāng)前請求的事物id
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream == null) {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
           //如果日志輸出流不存在,那么創(chuàng)建一個蝇刀,日志文件的名稱為logDir+當(dāng)前請求事物id
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
          //創(chuàng)建文件輸出流
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
           //序列化文件頭信息到日志流中
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
           //文件頭信息刷到文件中
            logStream.flush();
            //設(shè)置filePadding的currentSize值然爆,為下面給日志文件提前分配磁盤空間做準(zhǔn)備
            filePadding.setCurrentSize(fos.getChannel().position());
          //把輸出流加入到streamsToFlush中奴烙,等待將來在合適的時候沖刷到磁盤
            streamsToFlush.add(fos);
        }
      //給新的log文件預(yù)先分配磁盤空間恰起,這個知識點我們在http://www.reibang.com/p/f10ffc0ff861 中有詳細(xì)的描述肯污,預(yù)分配的磁盤大小是64M
        filePadding.padFile(fos.getChannel());
       //把請求轉(zhuǎn)化成二進(jìn)制數(shù)據(jù)
        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
       //計算本次事物的crc碼哄芜,然后寫入log
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        //把本次事物寫入log文件
        Util.writeTxnBytes(oa, buf);
        return true;
    }

到此事物操作的日志信息已經(jīng)寫入到文件輸入流了,這個時候的日志數(shù)據(jù)還存在內(nèi)存中,還沒有刷新到磁盤涂屁,接下來我們分析flush

SyncRequestProcessor.flush
 private void flush() throws IOException, RequestProcessorException {
        if (this.toFlush.isEmpty()) {
            return;
        }

        ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());

        long flushStartTime = Time.currentElapsedTime();
     //事物日志提交到磁盤的方法入口栏账,下面會單獨解析
        zks.getZKDatabase().commit();
        ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);

        if (this.nextProcessor == null) {
            this.toFlush.clear();
        } else {
            while (!this.toFlush.isEmpty()) {
               //從toFlush中取出請求
                final Request i = this.toFlush.remove();
                long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
              //把請求交給下一個processor處理
                this.nextProcessor.processRequest(i);
            }
            if (this.nextProcessor instanceof Flushable) {
                ((Flushable) this.nextProcessor).flush();
            }
            lastFlushTime = Time.currentElapsedTime();
        }
    }

FileTxnLog.commit

ZKDatabase().commit()最終會調(diào)用FileTxnLog.commit來實現(xiàn)把事物日志刷新到磁盤

/**
     * commit the logs. make sure that everything hits the
     * disk
     */
//注釋說的很明白盟萨,提交事物日志,確保所有日志可以持久化到磁盤
  
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
       //遍歷文件輸出流
        for (FileOutputStream log : streamsToFlush) {
           //調(diào)用文件輸出流flush
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if (serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }

                    LOG.warn(
                        "fsync-ing the write ahead log in {} took {}ms which will adversely effect operation latency."
                            + "File size is {} bytes. See the ZooKeeper troubleshooting guide",
                        Thread.currentThread().getName(),
                        syncElapsedMS,
                        channel.size());
                }

                ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.poll().close();
        }

        // Roll the log file if we exceed the size limit
        if (txnLogSizeLimit > 0) {
            long logSize = getCurrentLogSize();

            if (logSize > txnLogSizeLimit) {
                LOG.debug("Log size limit reached: {}", logSize);
                rollLog();
            }
        }
    }
ZookeeperServer.takeSnapshot

上面分析了事物日志的處理過程男杈,接下分析的是快照發(fā)生成的過程旺垒,takeSnapshot會調(diào)用FileTxnSnapLog.save方法

public void save(
        DataTree dataTree,
        ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
        boolean syncSnap) throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
      //根據(jù)最新事物id生成新snapshot名字,然后創(chuàng)建新快照的名字
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
        try {
            //把zookeeper server端內(nèi)存數(shù)據(jù)庫序列化到snapshot文件中眯搭,關(guān)于snapLog.serialize方法我們就不做具體解析了,可以參考http://www.reibang.com/p/f10ffc0ff861文章中的講解
            snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
        } catch (IOException e) {
            if (snapshotFile.length() == 0) {
                /* This may be caused by a full disk. In such a case, the server
                 * will get stuck in a loop where it tries to write a snapshot
                 * out to disk, and ends up creating an empty file instead.
                 * Doing so will eventually result in valid snapshots being
                 * removed during cleanup. */
                if (snapshotFile.delete()) {
                    LOG.info("Deleted empty snapshot file: {}", snapshotFile.getAbsolutePath());
                } else {
                    LOG.warn("Could not delete empty snapshot file: {}", snapshotFile.getAbsolutePath());
                }
            } else {
                /* Something else went wrong when writing the snapshot out to
                 * disk. If this snapshot file is invalid, when restarting,
                 * ZooKeeper will skip it, and find the last known good snapshot
                 * instead. */
            }
            throw e;
        }
    }
FinalRequestProcessor

經(jīng)過SyncRequestProcessor對日志和snapshot的處理,保證了事物的安全性,接下來我們看FinalRequestProcessor中發(fā)生了什么提澎,F(xiàn)inalRequestProcessor不是單獨的線程,它的processRequest方法會被SyncRequestProcessor調(diào)用,該方法巨長跨嘉,但都是針對不同的請求類型做不同處理邏輯

 public void processRequest(Request request) {
        LOG.debug("Processing request:: {}", request);

        // request.addRQRec(">final");
        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
        if (request.type == OpCode.ping) {
            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
        }
       //zookeeper把請求事物應(yīng)用到內(nèi)存數(shù)據(jù)庫兑燥,下面我們會詳細(xì)解析
        ProcessTxnResult rc = zks.processTxn(request);

        // ZOOKEEPER-558:
        // In some cases the server does not close the connection (e.g., closeconn buffer
        // was not being queued — ZOOKEEPER-558) properly. This happens, for example,
        // when the client closes the connection. The server should still close the session, though.
        // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
        if (request.type == OpCode.closeSession && connClosedByClient(request)) {
            // We need to check if we can close the session id.
            // Sometimes the corresponding ServerCnxnFactory could be null because
            // we are just playing diffs from the leader.
            if (closeSession(zks.serverCnxnFactory, request.sessionId)
                || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                return;
            }
        }

        if (request.getHdr() != null) {
            /*
             * Request header is created only by the leader, so this must be
             * a quorum request. Since we're comparing timestamps across hosts,
             * this metric may be incorrect. However, it's still a very useful
             * metric to track in the happy case. If there is clock drift,
             * the latency can go negative. Note: headers use wall time, not
             * CLOCK_MONOTONIC.
             */
            long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
            if (propagationLatency >= 0) {
                ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
            }
        }

        if (request.cnxn == null) {
            return;
        }
        ServerCnxn cnxn = request.cnxn;

        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

        String lastOp = "NA";
        // Notify ZooKeeperServer that the request has finished so that it can
        // update any request accounting/throttling limits
        zks.decInProcess();
        zks.requestFinished(request);
        Code err = Code.OK;
        Record rsp = null;
        String path = null;
        try {
            if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                AuditHelper.addAuditLog(request, rc, true);
                /*
                 * When local session upgrading is disabled, leader will
                 * reject the ephemeral node creation due to session expire.
                 * However, if this is the follower that issue the request,
                 * it will have the correct error code, so we should use that
                 * and report to user
                 */
                if (request.getException() != null) {
                    throw request.getException();
                } else {
                    throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
                }
            }

            KeeperException ke = request.getException();
            if (ke instanceof SessionMovedException) {
                throw ke;
            }
            if (ke != null && request.type != OpCode.multi) {
                throw ke;
            }

            LOG.debug("{}", request);

            if (request.isStale()) {
                ServerMetrics.getMetrics().STALE_REPLIES.add(1);
            }
            AuditHelper.addAuditLog(request, rc);
            switch (request.type) {
            case OpCode.ping: {
                lastOp = "PING";
                updateStats(request, lastOp, lastZxid);

                cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
                return;
            }
            case OpCode.createSession: {
                lastOp = "SESS";
                updateStats(request, lastOp, lastZxid);

                zks.finishSessionInit(request.cnxn, true);
                return;
            }
            case OpCode.multi: {
                lastOp = "MULT";
                rsp = new MultiResponse();

                for (ProcessTxnResult subTxnResult : rc.multiResult) {

                    OpResult subResult;

                    switch (subTxnResult.type) {
                    case OpCode.check:
                        subResult = new CheckResult();
                        break;
                    case OpCode.create:
                        subResult = new CreateResult(subTxnResult.path);
                        break;
                    case OpCode.create2:
                    case OpCode.createTTL:
                    case OpCode.createContainer:
                        subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                        break;
                    case OpCode.delete:
                    case OpCode.deleteContainer:
                        subResult = new DeleteResult();
                        break;
                    case OpCode.setData:
                        subResult = new SetDataResult(subTxnResult.stat);
                        break;
                    case OpCode.error:
                        subResult = new ErrorResult(subTxnResult.err);
                        if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
                            throw new SessionMovedException();
                        }
                        break;
                    default:
                        throw new IOException("Invalid type of op");
                    }

                    ((MultiResponse) rsp).add(subResult);
                }

                break;
            }
            case OpCode.multiRead: {
                lastOp = "MLTR";
                MultiOperationRecord multiReadRecord = new MultiOperationRecord();
                ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
                rsp = new MultiResponse();
                OpResult subResult;
                for (Op readOp : multiReadRecord) {
                    try {
                        Record rec;
                        switch (readOp.getType()) {
                        case OpCode.getChildren:
                            rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                            subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                            break;
                        case OpCode.getData:
                            rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                            GetDataResponse gdr = (GetDataResponse) rec;
                            subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                            break;
                        default:
                            throw new IOException("Invalid type of readOp");
                        }
                    } catch (KeeperException e) {
                        subResult = new ErrorResult(e.code().intValue());
                    }
                    ((MultiResponse) rsp).add(subResult);
                }
                break;
            }
            case OpCode.create: {
                lastOp = "CREA";
                rsp = new CreateResponse(rc.path);
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break;
            }
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                lastOp = "CREA";
                rsp = new Create2Response(rc.path, rc.stat);
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break;
            }
            case OpCode.delete:
            case OpCode.deleteContainer: {
                lastOp = "DELE";
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break;
            }
            case OpCode.setData: {
                lastOp = "SETD";
                rsp = new SetDataResponse(rc.stat);
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break;
            }
            case OpCode.reconfig: {
                lastOp = "RECO";
                rsp = new GetDataResponse(
                    ((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
                    rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.setACL: {
                lastOp = "SETA";
                rsp = new SetACLResponse(rc.stat);
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break;
            }
            case OpCode.closeSession: {
                lastOp = "CLOS";
                err = Code.get(rc.err);
                break;
            }
            case OpCode.sync: {
                lastOp = "SYNC";
                SyncRequest syncRequest = new SyncRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
                rsp = new SyncResponse(syncRequest.getPath());
                requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
                break;
            }
            case OpCode.check: {
                lastOp = "CHEC";
                rsp = new SetDataResponse(rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.exists: {
                lastOp = "EXIS";
                // TODO we need to figure out the security requirement for this!
                ExistsRequest existsRequest = new ExistsRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
                path = existsRequest.getPath();
                if (path.indexOf('\0') != -1) {
                    throw new KeeperException.BadArgumentsException();
                }
                Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
                rsp = new ExistsResponse(stat);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
            case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                path = getDataRequest.getPath();
                rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
            case OpCode.setWatches: {
                lastOp = "SETW";
                SetWatches setWatches = new SetWatches();
                // TODO we really should not need this
                request.request.rewind();
                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                long relativeZxid = setWatches.getRelativeZxid();
                zks.getZKDatabase()
                   .setWatches(
                       relativeZxid,
                       setWatches.getDataWatches(),
                       setWatches.getExistWatches(),
                       setWatches.getChildWatches(),
                       Collections.emptyList(),
                       Collections.emptyList(),
                       cnxn);
                break;
            }
            case OpCode.setWatches2: {
                lastOp = "STW2";
                SetWatches2 setWatches = new SetWatches2();
                // TODO we really should not need this
                request.request.rewind();
                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                long relativeZxid = setWatches.getRelativeZxid();
                zks.getZKDatabase().setWatches(relativeZxid,
                        setWatches.getDataWatches(),
                        setWatches.getExistWatches(),
                        setWatches.getChildWatches(),
                        setWatches.getPersistentWatches(),
                        setWatches.getPersistentRecursiveWatches(),
                        cnxn);
                break;
            }
            case OpCode.addWatch: {
                lastOp = "ADDW";
                AddWatchRequest addWatcherRequest = new AddWatchRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        addWatcherRequest);
                zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
                rsp = new ErrorResponse(0);
                break;
            }
            case OpCode.getACL: {
                lastOp = "GETA";
                GetACLRequest getACLRequest = new GetACLRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
                path = getACLRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                zks.checkACL(
                    request.cnxn,
                    zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
                    null);

                Stat stat = new Stat();
                List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
                requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());

                try {
                    zks.checkACL(
                        request.cnxn,
                        zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.ADMIN,
                        request.authInfo,
                        path,
                        null);
                    rsp = new GetACLResponse(acl, stat);
                } catch (KeeperException.NoAuthException e) {
                    List<ACL> acl1 = new ArrayList<ACL>(acl.size());
                    for (ACL a : acl) {
                        if ("digest".equals(a.getId().getScheme())) {
                            Id id = a.getId();
                            Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
                            acl1.add(new ACL(a.getPerms(), id1));
                        } else {
                            acl1.add(a);
                        }
                    }
                    rsp = new GetACLResponse(acl1, stat);
                }
                break;
            }
            case OpCode.getChildren: {
                lastOp = "GETC";
                GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
                path = getChildrenRequest.getPath();
                rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
            case OpCode.getAllChildrenNumber: {
                lastOp = "GETACN";
                GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
                path = getAllChildrenNumberRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                zks.checkACL(
                    request.cnxn,
                    zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ,
                    request.authInfo,
                    path,
                    null);
                int number = zks.getZKDatabase().getAllChildrenNumber(path);
                rsp = new GetAllChildrenNumberResponse(number);
                break;
            }
            case OpCode.getChildren2: {
                lastOp = "GETC";
                GetChildren2Request getChildren2Request = new GetChildren2Request();
                ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
                Stat stat = new Stat();
                path = getChildren2Request.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                zks.checkACL(
                    request.cnxn,
                    zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ,
                    request.authInfo, path,
                    null);
                List<String> children = zks.getZKDatabase()
                                           .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
                rsp = new GetChildren2Response(children, stat);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
            case OpCode.checkWatches: {
                lastOp = "CHKW";
                CheckWatchesRequest checkWatches = new CheckWatchesRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
                WatcherType type = WatcherType.fromInt(checkWatches.getType());
                path = checkWatches.getPath();
                boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
                if (!containsWatcher) {
                    String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                    throw new KeeperException.NoWatcherException(msg);
                }
                requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
                break;
            }
            case OpCode.removeWatches: {
                lastOp = "REMW";
                RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
                WatcherType type = WatcherType.fromInt(removeWatches.getType());
                path = removeWatches.getPath();
                boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
                if (!removed) {
                    String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                    throw new KeeperException.NoWatcherException(msg);
                }
                requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
                break;
            }
            case OpCode.getEphemerals: {
                lastOp = "GETE";
                GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
                String prefixPath = getEphemerals.getPrefixPath();
                Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
                List<String> ephemerals = new ArrayList<>();
                if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
                    ephemerals.addAll(allEphems);
                } else {
                    for (String p : allEphems) {
                        if (p.startsWith(prefixPath)) {
                            ephemerals.add(p);
                        }
                    }
                }
                rsp = new GetEphemeralsResponse(ephemerals);
                break;
            }
            }
        } catch (SessionMovedException e) {
            // session moved is a connection level error, we need to tear
            // down the connection otw ZOOKEEPER-710 might happen
            // ie client on slow follower starts to renew session, fails
            // before this completes, then tries the fast follower (leader)
            // and is successful, however the initial renew is then
            // successfully fwd/processed by the leader and as a result
            // the client and leader disagree on where the client is most
            // recently attached (and therefore invalid SESSION MOVED generated)
            cnxn.sendCloseSession();
            return;
        } catch (KeeperException e) {
            err = e.code();
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process {}", request, e);
            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            bb.rewind();
            while (bb.hasRemaining()) {
                sb.append(Integer.toHexString(bb.get() & 0xff));
            }
            LOG.error("Dumping request buffer: 0x{}", sb.toString());
            err = Code.MARSHALLINGERROR;
        }

        ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());

        updateStats(request, lastOp, lastZxid);

        try {
         //經(jīng)過上面各種case 最后會到這里芯侥,下面的邏輯就是把操作的狀態(tài)根據(jù)不同的操作類型發(fā)送給客戶端
            if (path == null || rsp == null) {
                cnxn.sendResponse(hdr, rsp, "response");
            } else {
                int opCode = request.type;
                Stat stat = null;
                // Serialized read and get children responses could be cached by the connection
                // object. Cache entries are identified by their path and last modified zxid,
                // so these values are passed along with the response.
                switch (opCode) {
                    case OpCode.getData : {
                        GetDataResponse getDataResponse = (GetDataResponse) rsp;
                        stat = getDataResponse.getStat();
                        cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                        break;
                    }
                    case OpCode.getChildren2 : {
                        GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
                        stat = getChildren2Response.getStat();
                        cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                        break;
                    }
                    default:
                        cnxn.sendResponse(hdr, rsp, "response");
                }
            }

            if (request.type == OpCode.closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIXMSG", e);
        }
    }

我們看下新建節(jié)點的請求是如何被寫入到zookeeper內(nèi)存數(shù)據(jù)庫的,這個過是一個很長的方法調(diào)用鏈我們一個一個分析

ZookeeperServer.processTxn
public ProcessTxnResult processTxn(Request request) {
        TxnHeader hdr = request.getHdr();
        processTxnForSessionEvents(request, hdr, request.getTxn());

        final boolean writeRequest = (hdr != null);
        final boolean quorumRequest = request.isQuorum();

        // return fast w/o synchronization when we get a read
        if (!writeRequest && !quorumRequest) {
            return new ProcessTxnResult();
        }
        synchronized (outstandingChanges) {
          //把事物作用到內(nèi)存數(shù)據(jù)庫
            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            if (writeRequest) {
                long zxid = hdr.getZxid();
              //下面把changeRecord從outstandingChanges中刪除
                while (!outstandingChanges.isEmpty()
                        && outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = outstandingChanges.remove();
                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (cr.zxid < zxid) {
                        LOG.warn(
                            "Zxid outstanding 0x{} is less than current 0x{}",
                            Long.toHexString(cr.zxid),
                            Long.toHexString(zxid));
                    }
                    if (outstandingChangesForPath.get(cr.path) == cr) {
                        outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            if (quorumRequest) {
                getZKDatabase().addCommittedProposal(request);
            }
            return rc;
        }
    }
DataTree.processTxn

把事物作用到數(shù)據(jù)庫最終會調(diào)用DataTree.processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
           //下面有很多case判斷,都是根據(jù)操作的不同做不同的操作,我們只分析create類型
            switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
             //在內(nèi)存數(shù)據(jù)庫執(zhí)行新建節(jié)點操作
                createNode(
                    createTxn.getPath(),
                    createTxn.getData(),
                    createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0,
                    createTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    null);
                break;
            case OpCode.create2:
                CreateTxn create2Txn = (CreateTxn) txn;
                rc.path = create2Txn.getPath();
                Stat stat = new Stat();
                createNode(
                    create2Txn.getPath(),
                    create2Txn.getData(),
                    create2Txn.getAcl(),
                    create2Txn.getEphemeral() ? header.getClientId() : 0,
                    create2Txn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            ........... 

        /*
         * Things we can only update after the whole txn is applied to data
         * tree.
         *
         * If we update the lastProcessedZxid with the first sub txn in multi
         * and there is a snapshot in progress, it's possible that the zxid
         * associated with the snapshot only include partial of the multi op.
         *
         * When loading snapshot, it will only load the txns after the zxid
         * associated with snapshot file, which could cause data inconsistency
         * due to missing sub txns.
         *
         * To avoid this, we only update the lastProcessedZxid when the whole
         * multi-op txn is applied to DataTree.
         */
        if (!isSubTxn) {
            /*
             * A snapshot might be in progress while we are modifying the data
             * tree. If we set lastProcessedZxid prior to making corresponding
             * change to the tree, then the zxid associated with the snapshot
             * file will be ahead of its contents. Thus, while restoring from
             * the snapshot, the restore method will not apply the transaction
             * for zxid associated with the snapshot file, since the restore
             * method assumes that transaction to be present in the snapshot.
             *
             * To avoid this, we first apply the transaction and then modify
             * lastProcessedZxid.  During restore, we correctly handle the
             * case where the snapshot contains data ahead of the zxid associated
             * with the file.
             */
          //更新lastProcessedZxid
            if (rc.zxid > lastProcessedZxid) {
                lastProcessedZxid = rc.zxid;
            }

            if (digestFromLoadedSnapshot != null) {
                compareSnapshotDigests(rc.zxid);
            } else {
                // only start recording digest when we're not in fuzzy state
                logZxidDigest(rc.zxid, getTreeDigest());
            }
        }

        return rc;
    }

createNode
  public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
       //父節(jié)點的名稱
        String parentName = path.substring(0, lastSlash);
      //子節(jié)點的名稱
        String childName = path.substring(lastSlash + 1);
    //創(chuàng)建本節(jié)點的狀態(tài)信息
        StatPersisted stat = createStat(zxid, time, ephemeralOwner);
      //在zookeeper內(nèi)存數(shù)據(jù)庫中獲取父節(jié)點
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            // Add the ACL to ACL cache first, to avoid the ACL not being
            // created race condition during fuzzy snapshot sync.
            //
            // This is the simplest fix, which may add ACL reference count
            // again if it's already counted in in the ACL map of fuzzy
            // snapshot, which might also happen for deleteNode txn, but
            // at least it won't cause the ACL not exist issue.
            //
            // Later we can audit and delete all non-referenced ACLs from
            // ACL map when loading the snapshot/txns from disk, like what
            // we did for the global sessions.
            Long longval = aclCache.convertAcls(acl);

            Set<String> children = parent.getChildren();
            //判斷添加的節(jié)點是不是已經(jīng)存在
             if (children.contains(childName)) {
                throw new KeeperException.NodeExistsException();
            }
             //這里主要是修改服務(wù)端的摘要信息
            nodes.preChange(parentName, parent);
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }
            // There is possibility that we'll replay txns for a node which
            // was created and then deleted in the fuzzy range, and it's not
            // exist in the snapshot, so replay the creation might revert the
            // cversion and pzxid, need to check and only update when it's
            // larger.
           //設(shè)置父節(jié)點的cversion和Pzxid
            if (parentCVersion > parent.stat.getCversion()) {
                parent.stat.setCversion(parentCVersion);
                parent.stat.setPzxid(zxid);
            }
            //創(chuàng)建子節(jié)點信息
            DataNode child = new DataNode(data, longval, stat);
           //把子節(jié)點加入到父節(jié)點中
            parent.addChild(childName);
          //把父節(jié)點加入到摘要信息的計算中
            nodes.postChange(parentName, parent);
            nodeDataSize.addAndGet(getNodeSize(path, child.data));
           //把子節(jié)點信息加入到zookeeper內(nèi)存數(shù)據(jù)庫中阵幸,到此子節(jié)點信息加入了內(nèi)存數(shù)據(jù)庫挚赊,父節(jié)點在內(nèi)存數(shù)據(jù)庫中的狀態(tài)信息也完成了更新
            nodes.put(path, child);
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
            if (ephemeralType == EphemeralType.CONTAINER) {
               //如果節(jié)點類型是Container欢峰,那么把它放進(jìn)入containers中
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
               //如果節(jié)點類型是ttl,那么把它放進(jìn)入ttls中
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
               //如果是瞬時節(jié)點,那么把它放入瞬時接地對應(yīng)的存儲中
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
            if (outputStat != null) {
                child.copyStat(outputStat);
            }
        }
        // now check if its one of the zookeeper node child
       //下面是對節(jié)點配額的處理邏輯雕崩,就不分析了
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix = getMaxPrefixWithQuota(path);
        long bytes = data == null ? 0 : data.length;
        if (lastPrefix != null) {
            // ok we have some match and need to update
            updateCountBytes(lastPrefix, bytes, 1);
        }
        updateWriteStat(path, bytes);
     //觸發(fā)節(jié)點創(chuàng)建成功事件
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        //觸發(fā)父節(jié)點的子節(jié)點變化事件
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
    }

上就是節(jié)點創(chuàng)建在client和server端的處理邏輯
感謝耐心讀完,下一遍文章會接著上面的代碼繼續(xù)分析watcher觸發(fā)的邏輯

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市肤寝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刨摩,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耘婚,死亡現(xiàn)場離奇詭異罢浇,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)沐祷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門嚷闭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胞锰,你說我怎么就攤上這事【ふィ” “怎么了嗅榕?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吵聪。 經(jīng)常有香客問我凌那,道長,這世上最難降的妖魔是什么吟逝? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任帽蝶,我火速辦了婚禮,結(jié)果婚禮上块攒,老公的妹妹穿的比我還像新娘励稳。我一直安慰自己佃乘,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布驹尼。 她就那樣靜靜地躺著趣避,像睡著了一般。 火紅的嫁衣襯著肌膚如雪扶欣。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天千扶,我揣著相機(jī)與錄音料祠,去河邊找鬼。 笑死澎羞,一個胖子當(dāng)著我的面吹牛髓绽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播妆绞,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼顺呕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了括饶?” 一聲冷哼從身側(cè)響起株茶,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎图焰,沒想到半個月后启盛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡技羔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年僵闯,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片藤滥。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡鳖粟,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拙绊,到底是詐尸還是另有隱情向图,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布标沪,位于F島的核電站张漂,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏谨娜。R本人自食惡果不足惜航攒,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望趴梢。 院中可真熱鬧漠畜,春花似錦币他、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至瘾敢,卻和暖如春拍冠,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背簇抵。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工庆杜, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人碟摆。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓晃财,卻偏偏與公主長得像,于是被迫代替她去往敵國和親典蜕。 傳聞我的和親對象是個殘疾皇子断盛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,543評論 2 349