zookeeper集群的分布式事務(wù)請求處理過程

集群處理請求分兩種:事務(wù)和非事務(wù)劲室,對于非事務(wù),請求處理和單機類似获搏,節(jié)點本地就可以完成數(shù)據(jù)的請求;事務(wù)請求需要提交給Leader處理失乾,Leader以投票的形式常熙,等待半數(shù)的Follower的投票纬乍,完成同步后才將操作結(jié)果返回

這里裸卫,無論什么模式蕾额、節(jié)點類型,處理客戶端請求的都是ServerCnxnFactory的子類彼城,默認為NIOServerCnxnFactory诅蝶,只是其內(nèi)部處理調(diào)用鏈的zkServer實例不同,單機模式為ZooKeeperServer的實例募壕,其他類型的節(jié)點使用ZooKeeperServer類的子類. ZooKeeperServer的子類UML類圖如下:


zookeeper請求處理的各個processor.jpg

1 這么多ZooKeeperServer的子類调炬,一個事物請求來了,調(diào)用什么方法去處理事物

1.1 在org.apache.zookeeper.server.quorum.QuorumPeer#run方法中舱馅,首先確定角色缰泡。

while (running) {
                switch (getPeerState()) {
                case LOOKING:
                             "省略n行代碼====================="
                            "當(dāng)peerState是LOOKING的時候,進行選舉投票代嗤,選舉出leader"
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } 
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        "當(dāng)選舉過之后是OBSERVING 狀態(tài)棘钞,那么實例化的是ObserverZooKeeperServer"
                        setObserver(makeObserver(logFactory));
                        "同步leader"
                        observer.observeLeader();
                    } 
                 "省略n行代碼====================="
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                      "當(dāng)選舉過之后是FOLLOWING狀態(tài),那么實例化的是          
                      FollowerZooKeeperServer"
                        setFollower(makeFollower(logFactory));
                       "同步leader"
                        follower.followLeader();
                    }
                         "省略n行代碼====================="
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    }
                   "省略n行代碼====================="
                    break;
                }
            }

從上面的代碼可以看出干毅,在選舉成功之后宜猜,就確定了,每個服務(wù)器是什么狀態(tài)硝逢,也就確定是什么ZooKeeperServer實例姨拥。

1.2 對于每個ZooKeeperServer實例,他的業(yè)務(wù)處理鏈是不同的渠鸽。

責(zé)任鏈由setupRequestProcessors方法確定

比如FollowerZooKeeperServer實例

protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

firstProcessor 是FollowerRequestProcessor叫乌,下一個是CommitProcessor,再下一個是
FinalRequestProcessor徽缚。而且還另外聚合了SyncRequestProcessor憨奸,下一個是SendAckRequestProcessor 。

1.3 那這個處理鏈是什么時候確定的呢凿试?

case OBSERVING:
                    try {
"實例化ObserverZookeeperServer"
                        setObserver(makeObserver(logFactory));
                  "從 leader那邊同步代碼,并且完成ObserverZookeeperServer的初始化排宰,包括責(zé)任鏈的建立"
                        observer.observeLeader();
                    } 
                     "省略n行代碼====================="
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
"實例化FollowerZookeeperServer"
                        setFollower(makeFollower(logFactory));
    "從 leader那邊同步代碼,并且完成FollowerZookeeperServer的初始化红省,包括責(zé)任鏈的建立"
                        follower.followLeader();
                    } 
  "省略n行代碼====================="
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        
                        leader.lead();
                        setLeader(null);
                    } 
                    break;
                }

org.apache.zookeeper.server.quorum.Learner#syncWithLeader的469行 zk.startup(); org.apache.zookeeper.server.quorum.Leader#lead 431 行 startZkServer();
調(diào)用下面的setupRequestProcessors方法额各,構(gòu)建責(zé)任鏈。
因為各自都繼承了zookeeperServer(繼承結(jié)構(gòu)看上圖)吧恃,并且重寫了setupRequestProcessors方法虾啦。
所以這里實際上是調(diào)用了各種ZookeeperServer實例的setupRequestProcessors方法。

public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        setState(State.RUNNING);
        notifyAll();
    }

2 請求的入口在哪里?

"org.apache.zookeeper.server.NIOServerCnxnFactory#run"
當(dāng)讀寫事件就緒時傲醉,NIOServerCnxn對象進行IO任務(wù)蝇闭。
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    }
"org.apache.zookeeper.server.ZooKeeperServer#processPacket"
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
                si.setOwner(ServerCnxn.me);
                submitRequest(si);
try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
              "丟給對應(yīng)的firstProcessor去處理,事物邏輯硬毕。
而對于不同的角色呻引,比如說leader,對應(yīng)的是LeaderZooKeeperServer 
,而 follower對應(yīng)的是FollowerZookeeperServer"
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        }

submitRequest之后,已經(jīng)丟給具體的責(zé)任鏈去處理了吐咳。而不同角色逻悠,不同的ZOokeeperServer實例,對應(yīng)的firstProcessor是不同的韭脊。

大概流程圖如下:


zookeeper事務(wù)處理的事務(wù)流程.png

3 當(dāng)客戶端請求到達leader的時候童谒,事物的流程是怎么樣的?

leader的firstProcessor沪羔,從LeaderZooKeeperServer#setupRequestProcessors方法中饥伊,可以看出PrepRequestProcessor是firstProcessor。processRequest方法蔫饰,只是把request對象添加到submittedRequests阻塞隊列中琅豆。業(yè)務(wù)處理在run方法中。

3.1 PrepRequestProcessor 對事物請求加事物頭篓吁,非事物請求茫因,checkSession

while (true) {
                Request request = submittedRequests.take();
                pRequest(request);
            }
switch (request.type) {
                case OpCode.create:
                CreateRequest createRequest = new CreateRequest();
"如果是 create,delete ,set等改變內(nèi)存數(shù)據(jù)庫,zkDatabase的請求越除,轉(zhuǎn)化成事物請求"
                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                break;
                "省略n行代碼====================="

            //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                "因為zookeeper的事物請求都是leader處理的节腐,
                  所以他的分布式唯一id,
                只要在leader側(cè)ks.getNextZxid()摘盆,
              唯一即可。AtomicLong 類型饱苟,保證事物請求并發(fā)時孩擂,線程安全。"
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;
           "對內(nèi)存數(shù)據(jù)庫zkDatabase不做改變的箱熬,就checkSession"
            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;
            default:
                LOG.warn("unknown type " + request.type);
                break;
  }
 request.zxid = zks.getZxid();
 nextProcessor.processRequest(request);

3.2 ProposalRequestProcessor类垦。對事物請求和非事物請求分流

ProposalRequestProcessor#processRequest
if(request instanceof LearnerSyncRequest){
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
                "非事物請求交給下一個CommitProcessor"
                nextProcessor.processRequest(request);
            "hdr不為空,說明是事物請求城须,委托給leader蚤认,發(fā)送proposal消息"
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                "并且自己先持久化到txnLog日志里面"
                syncProcessor.processRequest(request);
            }
        }
3.2.2 首先講一下,事物消息刷新到txnLog的過程糕伐。syncProcessor處理持久化事物日志的過程砰琢。
public void run() {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            "不讓所有的zookeeperServer一起發(fā)起快照"
            setRandRoll(r.nextInt(snapCount/2));
            while (true) {
                Request si = null;
                  "第一輪循環(huán)是toFlush為空,進入下面的si!=null 的判斷"
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    "調(diào)用FileTxnLog#append,追加transaction log 到日志文件"
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            setRandRoll(r.nextInt(snapCount/2));
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                  "第一輪最后陪汽,添加到toFlush训唱,第二輪就會flush,也就是hit the disk "
                    
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

下面是flush方法

private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
        "這里的commit才是真正的hit the disk , "
"而前面的append只是加入到groupcommit的數(shù)組中"
"LinkedList<FileOutputStream> streamsToFlush"
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            if (nextProcessor != null) {
                "這里進入AckRequestProcessor"
                nextProcessor.processRequest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

tip:
磁盤寫緩存只有強制事務(wù)日志刷到磁盤后,server才能對proposal進行ack操作挚冤。說得更明白一點况增,server會調(diào)用ZKDatabase的commit方法,這最終會調(diào)用FileChannel.force方法训挡。這樣澳骤,server會在ack之前保證事務(wù)已被持久化到磁盤。關(guān)于此事還有一點要注意澜薄,現(xiàn)代磁盤有一個寫緩存宴凉,可以保存要寫到磁盤的數(shù)據(jù)。如果啟用了寫緩存表悬,強行刷新不能保證返回的時候數(shù)據(jù)已落到磁盤弥锄,數(shù)據(jù)會落到寫緩存中。為了保證在FileChannel.force()返回后數(shù)據(jù)落到磁盤蟆沫,要禁用寫磁盤緩存籽暇。操作系統(tǒng)有許多方式可以禁用

3.2.1 發(fā)送proposal消息
Leader#propose
public Proposal propose(Request request) throws XidRolloverException {
         "省略n行代碼======================"
          "packetType =PROPOSAL "
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
                baos.toByteArray(), null);
        
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        synchronized (this) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Proposing:: " + request);
            }

            lastProposed = p.packet.getZxid();
            "ConcurrentMap<Long, Proposal> outstandingProposals 發(fā)送出去的提案"
            outstandingProposals.put(lastProposed, p);
            sendPacket(pp);
        }
        return p;
    }
  void sendPacket(QuorumPacket qp) {
        "HashSet<LearnerHandler> forwardingFollowers"
          "給所有的follower發(fā)送proposal消息"
          "LearnerHandler 是leader和learner的通訊紐帶"
        synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {                
                f.queuePacket(qp);
            }
        }
    }

3.3 AckRequestProcessor 判斷ackSet是否過半。如果是饭庞,就發(fā)送COMMIT和INFORM 消息

 public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if(self != null)
            leader.processAck(self.getId(), request.zxid, null);
        else
            LOG.error("Null QuorumPeer");
    }
Leader#processAck
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
       
    
      "省略n行代碼======================"
"前面ProposalProcessor已經(jīng)放進去一個了"
        Proposal p = outstandingProposals.get(zxid);
        
        "leader添加自己到ackSet中戒悠,自己肯定投自己的提案,只要再接受一個ACK就"
        "可以通過了"
        p.ackSet.add(sid);
         "判斷是否過半"
        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
            if (zxid != lastCommitted+1) {
                LOG.warn("Commiting zxid 0x{} from {} not first!",
                        Long.toHexString(zxid), followerAddr);
                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
            }
            outstandingProposals.remove(zxid);
            if (p.request != null) {
                toBeApplied.add(p);
            }

            if (p.request == null) {
                LOG.warn("Going to commmit null request for proposal: {}", p);
            }
            commit(zxid);
            inform(p);
            zk.commitProcessor.commit(p.request);
            if(pendingSyncs.containsKey(zxid)){
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }

這兒有個疑問
就是加入執(zhí)行過半判斷舟山,還沒有接受到過半的ACK怎么辦绸狐?直接認定失敗累盗?

3.4 follower.followLeader的時候寒矿,接受PROPOSAL消息

Follower#followLeader
while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
Follower#processPacket
"處理PROPOSAL消息"
case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            "進入FollowerZooKeeperServer#logRequest方法,進入syncProcessor"
          "然后進入SyncRequestProcessor#flush方法若债,進入下一個Processor"
            fzk.logRequest(hdr, txn);
            break;

3.5 SendAckRequestProcessor follower發(fā)送ACK 給leader,發(fā)現(xiàn)observer是沒有發(fā)的

public void processRequest(Request si) {
        if(si.type != OpCode.sync){
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                null);
            try {
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn("Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner.sock.isClosed()) {
                        learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
                }
            }
        }
    }

3.6 LearnerHandler接受ACK

LearnerHandler#run 576 行
switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Received ACK from Observer  " + this.sid);
                        }
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    "這里的sid符相,指的就是發(fā)送ACK對應(yīng)的Follower"
                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;

然后Leader#processAck方法,把上面的follower的sid蠢琳,假如到對應(yīng)Proposal的ackSet中啊终。

Leader#processAck
"過半之后"
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
            if (zxid != lastCommitted+1) {
                LOG.warn("Commiting zxid 0x{} from {} not first!",
                        Long.toHexString(zxid), followerAddr);
                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
            }
            outstandingProposals.remove(zxid);
            if (p.request != null) {
                "將要被使用的proposal"
                toBeApplied.add(p);
            }

            if (p.request == null) {
                LOG.warn("Going to commmit null request for proposal: {}", p);
            }
            "給follower發(fā)送COMMIT 消息,給observer發(fā)送INFORM消息"
            commit(zxid);
            inform(p);
            zk.commitProcessor.commit(p.request);
            if(pendingSyncs.containsKey(zxid)){
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
        }
3.6.1 follower接受COMMIT消息,并且commit
Follower#processPacket
case Leader.COMMIT:
            fzk.commit(qp.getZxid());

CommitProcessor

CommitProcessor#run
public void run() {
        try {
            Request nextPending = null;            
            while (!finished) {
                "第一輪循環(huán)toProcess長度是0 傲须,committedRequests長度是1 "
                int len = toProcess.size();
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest(toProcess.get(i));
                }
                toProcess.clear();
                synchronized (this) {
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() == 0) {
                        wait();
                        continue;
                    }
                    // First check and see if the commit came in for the pending
                    // request
   "第一輪循環(huán)toProcess長度是0 蓝牲,committedRequests長度是1 走這里,加入到toProcess "
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() > 0) {
                        Request r = committedRequests.remove();
                        /*
                         * We match with nextPending so that we can move to the
                         * next request when it is committed. We also want to
                         * use nextPending because it has the cnxn member set
                         * properly.
                         */
                        if (nextPending != null
                                && nextPending.sessionId == r.sessionId
                                && nextPending.cxid == r.cxid) {
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r.hdr;
                            nextPending.txn = r.txn;
                            nextPending.zxid = r.zxid;
                            toProcess.add(nextPending);
                            nextPending = null;
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                            toProcess.add(r);
                        }
                    }
                }

                // We haven't matched the pending requests, so go back to
                // waiting
                if (nextPending != null) {
                    continue;
                }

                synchronized (this) {
                    // Process the next requests in the queuedRequests
                    while (nextPending == null && queuedRequests.size() > 0) {
                        Request request = queuedRequests.remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            nextPending = request;
                            break;
                        case OpCode.sync:
                            if (matchSyncs) {
                                nextPending = request;
                            } else {
                                toProcess.add(request);
                            }
                            break;
                        default:
                            toProcess.add(request);
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted exception while waiting", e);
        } catch (Throwable e) {
            LOG.error("Unexpected exception causing CommitProcessor to exit", e);
        }
        LOG.info("CommitProcessor exited loop!");
    }
3.6.2 然后就是follower 的FinalRequestProcessor
synchronized (zks.outstandingChanges) {
            while (!zks.outstandingChanges.isEmpty()
                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0);
                if (cr.zxid < request.zxid) {
                    LOG.warn("Zxid outstanding "
                            + cr.zxid
                            + " is less than current " + request.zxid);
                }
                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                    zks.outstandingChangesForPath.remove(cr.path);
                }
            }
            if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
                "進入ZkDatabase處理txn"
               rc = zks.processTxn(hdr, txn);
            }
            // do not add non quorum packets to the queue.
            if (Request.isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }

以及構(gòu)建response對象返回泰讽。
leader側(cè)同上例衍。

4 假如request是發(fā)送到follower或者observer的話昔期,發(fā)送REQUEST消息給leader,讓leader來處理

FollowerRequestProcessor#run
while (!finished) {
                Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);
                
                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this follower has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                      "發(fā)送REQUEST消息給leader"
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.delete:
                case OpCode.setData:
                case OpCode.setACL:
                case OpCode.createSession:
                case OpCode.closeSession:
                case OpCode.multi:
                  "發(fā)送REQUEST消息給leader"
                    zks.getFollower().request(request);
                    break;
                }
            }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市肄渗,隨后出現(xiàn)的幾起案子镇眷,更是在濱河造成了極大的恐慌,老刑警劉巖翎嫡,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件欠动,死亡現(xiàn)場離奇詭異,居然都是意外死亡惑申,警方通過查閱死者的電腦和手機具伍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來圈驼,“玉大人人芽,你說我怎么就攤上這事〖ù啵” “怎么了萤厅?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長靴迫。 經(jīng)常有香客問我惕味,道長,這世上最難降的妖魔是什么玉锌? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任名挥,我火速辦了婚禮,結(jié)果婚禮上主守,老公的妹妹穿的比我還像新娘禀倔。我一直安慰自己,他們只是感情好参淫,可當(dāng)我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布救湖。 她就那樣靜靜地躺著,像睡著了一般黄刚。 火紅的嫁衣襯著肌膚如雪捎谨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天憔维,我揣著相機與錄音,去河邊找鬼畏邢。 笑死业扒,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的舒萎。 我是一名探鬼主播程储,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了章鲤?” 一聲冷哼從身側(cè)響起摊灭,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎败徊,沒想到半個月后帚呼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡皱蹦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年煤杀,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沪哺。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡沈自,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辜妓,到底是詐尸還是另有隱情枯途,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布籍滴,位于F島的核電站酪夷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏异逐。R本人自食惡果不足惜捶索,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望灰瞻。 院中可真熱鬧腥例,春花似錦、人聲如沸酝润。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽要销。三九已至构回,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間疏咐,已是汗流浹背纤掸。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留浑塞,地道東北人借跪。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像酌壕,于是被迫代替她去往敵國和親务豺。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,577評論 2 353