APACHE ZOOKEEPER 3.5.3 CODE REVIEW

導(dǎo)語

1.zab協(xié)議崩潰恢復(fù)如何實現(xiàn)leader選舉及數(shù)據(jù)同步?
2.zab消息廣播階段如何實現(xiàn)發(fā)起投票很魂、收集選票、提交事務(wù)稚铣,并保證事務(wù)的順序一致性?
3.paxos持偏、zab虱而、raft vs pbft vs pow、pos葱椭、ripple 等區(qū)塊鏈共識算法的區(qū)別是什么捂寿,分別適合什么場景?//TODO:

zab協(xié)議包含兩個階段崩潰恢復(fù)消息廣播,基于zookeeper 3.5.3集群啟動以及ZooKeeper.setData來分別說明下兩階段的流程孵运。


一.Index

1.1 崩潰恢復(fù)

org.apache.zookeeper.server.quorum.QuorumPeerMain.main
    ->QuorumPeerMain.runFromConfig
        ->NettyServerCnxnFactory.configure //配置ServerCnxnFactory
        ->quorumPeer = getQuorumPeer();
        ->quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir())) //設(shè)置FileTxnSnapLog秦陋,管理txn以及snap
        ->quorumPeer.setMyid(config.getServerId()) //設(shè)置myid
        ->quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit()); //超時時間
        ->quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//初始化一個空的ZKDatabase
        ->quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //設(shè)置QuorumVerifier
        ->quorumPeer.setCnxnFactory(cnxnFactory)
        ->org.apache.zookeeper.server.quorum.QuorumPeer.start() //啟動QuorumPeer
            ->QuorumPeer.loadDataBase
                ->zkDb.loadDataBase() //恢復(fù)掛機(jī)之前的內(nèi)存狀態(tài)
                    ->long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
                        ->processTransaction(hdr,dt,sessions, itr.getTxn());//重放所有的Transaction,
                        ->listener.onTxnLoaded(hdr, itr.getTxn()); //將已提交的日志放入org.apache.zookeeper.server.ZKDatabase.committedLog中并更新minCommittedLog治笨、maxCommittedLog兩個offset便于同步時使用
                            ->addCommittedProposal(r)
                ->currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //獲取之前記錄的Epoch
                ->acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            ->QuorumPeer.startServerCnxnFactory //綁定端口驳概,啟動服務(wù)
            ->QuorumPeer.startLeaderElection //開啟選舉
                ->currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//設(shè)置當(dāng)前服務(wù)器的投票,使用之前記錄的epoch以及zxid
                ->this.electionAlg = createElectionAlgorithm(electionType); //設(shè)置選舉算法粪小,默認(rèn)
                    ->qcm = new QuorumCnxManager(this);
                    ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run
                        ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ss.bind(addr)//綁定選舉端口
                        ->client = ss.accept();//等待連接
                        ->org.apache.zookeeper.server.quorum.QuorumCnxManager.receiveConnection //如果建立鏈接的對端的的sid小于當(dāng)前服務(wù)器id,則當(dāng)前服務(wù)器作為客戶端去建立鏈接抡句,否則啟動發(fā)送接收線程開始選舉流程探膊,此邏輯是為了避免兩臺服務(wù)器建立多個鏈接
                            ->closeSocket(sock);//關(guān)閉之前的連接
                            ->org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne
                                ->QuorumCnxManager.initiateConnection //net層面的數(shù)據(jù)接收發(fā)發(fā)送
                                    ->dout.writeLong(self.getId());dout.flush(); //建立鏈接發(fā)送的第一條為當(dāng)前服務(wù)器端myid
                                    ->SendWorker sw = new SendWorker(sock, sid); //將QuorumCnxManager.queueSendMap發(fā)送出去
                                    ->RecvWorker rw = new RecvWorker(sock, sid, sw);//將收到的消息加入QuorumCnxManager.recvQueue中
                                    ->sw.start();rw.start(); 
                    ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.start //啟動消息處理器
                        ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender.run
                        ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver.run
            ->QuorumPeer.run
                ->while (running) {
                    ->case LOOKING //注意leader選舉用的當(dāng)前節(jié)點的zxid為org.apache.zookeeper.server.DataTree.lastProcessedZxid,而本字段為FinalRequestProcessor.processRequest方法更新,F(xiàn)inalRequestProcessor為ProposalRequestProcessor->CommitProcessor之后的待榔,即commit階段才會更新逞壁,則zk其實是按照進(jìn)入提交階段zxid為準(zhǔn),即寫入過半后發(fā)出大于1個commit后算寫入成功
                        ->setCurrentVote(makeLEStrategy().lookForLeader());
                            ->org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader
                                ->synchronized(this){logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());} //更新選舉周期,并設(shè)置當(dāng)前服務(wù)器的leader選舉propose為自己:FastLeaderElection.proposedLeader,proposedZxid,proposedEpoch
                                ->sendNotifications //將自己的投票發(fā)給集群中所有節(jié)點
                                    ->ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());
                                    ->sendqueue.offer(notmsg);
                                ->while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){ //執(zhí)行循環(huán)锐锣,直到確定leader
                                    ->Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //獲取收到的Notification
                                    ->switch (n.state) {
                                        ->case LOOKING:
                                            ->if (n.electionEpoch > logicalclock.get()) { //如果收到的投票周期大于當(dāng)前節(jié)點認(rèn)為的周期腌闯,則更新周期、清空已經(jīng)收到的投票雕憔、
                                                ->logicalclock.set(n.electionEpoch);
                                                ->recvset.clear();
                                                ->if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch())} //更新當(dāng)前節(jié)點的投票姿骏,先比較周期,周期相同再比較zxid斤彼,zxid相同然后就比較myid分瘦,然后選大的作為當(dāng)前節(jié)點后續(xù)的投票
                                                ->sendNotifications();//將最新的投票發(fā)給集群節(jié)點
                                            ->else if (n.electionEpoch < logicalclock.get()) {} //如果收到的投票周期小于當(dāng)前節(jié)點周期,則忽略,繼續(xù)取下一條Notification
                                            ->else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); //收到的投票比較大琉苇,則更新為收到的投票嘲玫,并發(fā)送給集群其他節(jié)點
                                            ->recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //以上條件都不符合,則為收到的投票為其他節(jié)點認(rèn)可當(dāng)前節(jié)點的投票,把選票更新到recvset里
                                            ->if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { //查看收到的選票是否達(dá)到了要求并扇,QuorumMaj為過半策略去团,沒有達(dá)到要求繼續(xù)循環(huán)獲取Notification,否則設(shè)置投票結(jié)果穷蛹,結(jié)束leader選舉
                                                ->SyncedLearnerTracker.addAck(entry.getKey());
                                                    ->QuorumVerifierAcksetPair.getAckset().add(sid);
                                                ->SyncedLearnerTracker.hasAllQuorums
                                                    ->qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
                                                        ->QuorumMaj.containsQuorum
                                            -> while((n = recvqueue.poll(finalizeWait..//讀取剩余的notification土陪,直到讀取完成
                                            -> self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState()); //如果選舉的leader為自己,則QuorumPeer.state設(shè)為LEADING肴熏,否則設(shè)為FOLLOWING
                                            ->Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                                            ->return endVote;//結(jié)束選舉
                                    ->}
                    ->case LEADING: //如果當(dāng)前節(jié)點為lead節(jié)點
                        ->QuorumPeer.makeLeader
                        ->org.apache.zookeeper.server.quorum.Leader.Leader
                            ->Leader.ss.bind(self.getQuorumAddress());//leader綁定端口號等待follower建立連接鬼雀,本端口用來同步數(shù)據(jù),leader與follower之間心跳扮超、投票等數(shù)據(jù)交換
                        ->org.apache.zookeeper.server.quorum.Leader.lead
                            ->zk.loadData();
                                ->setZxid(zkDb.getDataTreeLastProcessedZxid()); //設(shè)置zxid
                                ->killSession(session, zkDb.getDataTreeLastProcessedZxid()); //清理過期的session
                                ->takeSnapshot();//落地一個干凈的Snapshot
                            ->cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
                                ->LearnerCnxAcceptor.run
                                    -> while (!stop) {Socket s = ss.accept();LearnerHandler fh = new LearnerHandler(s, Leader.this);fh.start();} //等待鏈接取刃,初始化follower的LearnerHandler作為消息處理器
                                        ->LearnerHandler.run //leader處理flower請求的主流程
                                            ->ia.readRecord(qp, "packet");//讀取第一個follower發(fā)送過來的消息[#1.1]蹋肮,讀取出sid出刷,設(shè)置LearnerHandler的sid
                                                ->this.sid = bbsid.getLong();
                                            ->QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); //發(fā)送leaderinof
                                            ->[#1.2]oa.writeRecord(newEpochPacket, "packet");
                                            ->ia.readRecord(ackEpochPacket, "packet");//讀取Follower的ack消息[#1.3]
                                            ->ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                                            ->leader.waitForEpochAck(this.getSid(), ss);
                                                ->Leader.waitForEpochAck //阻塞,直到過半Follower返回ack消息
                                                    ->electingFollowers.add(id);
                                                    ->if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) 
                                                        ->electionFinished = true;
                                                        ->electingFollowers.notifyAll();
                                            ->boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); //與follower同步
                                                ->long maxCommittedLog = db.getmaxCommittedLog(); //獲取三個值坯辩,zxid以及內(nèi)存中ZKDatabase.committedLog保存的最大最小日志
                                                ->long minCommittedLog = db.getminCommittedLog();
                                                ->long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
                                                ->[#1.4]if (lastProcessedZxid == peerLastZxid) {queueOpPacket(Leader.DIFF, peerLastZxid);}//如果相同馁龟,則發(fā)送Leader.DIFF消息
                                                ->[#1.5]if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) //如果leader節(jié)點的log小于當(dāng)前節(jié)點,則發(fā)送TRUNC消息截斷
                                                    ->queueOpPacket(Leader.TRUNC, maxCommittedLog);
                                                    ->currentZxid = maxCommittedLog;
                                                ->if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) //如果處在maxCommittedLog與minCommittedLog中間漆魔,則說明缺了部分?jǐn)?shù)據(jù)坷檩,則按照普通投票方式把ZKDatabase.committedLog內(nèi)存中保存的這部分?jǐn)?shù)據(jù)按照普通的Proposal發(fā)送出去却音,并提交Committed
                                                    ->LearnerHandler.queueCommittedProposals
                                                        ->while (itr.hasNext()) 
                                                            ->Proposal propose = itr.next()
                                                            ->queuePacket(propose.packet);//發(fā)送propose
                                                            ->queueOpPacket(Leader.COMMIT, packetZxid);//提交propose
                                                        ->queueOpPacket(Leader.DIFF, lastCommitedZxid); //不管啥情況下,都發(fā)個DIFF消息表名同步完成
                                                ->if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {//如果內(nèi)存中l(wèi)og不夠矢炼,則把Log以及snap中的消息都發(fā)過去
                                                ->leaderLastZxid = leader.startForwarding(this, currentZxid);[TODO://]//處理toBeApplied系瓢、outstandingProposals的目的是啥
                                            ->[#1.6]QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);bufferedOutput.flush();//發(fā)送NEWLEADER消息
                                            ->ia.readRecord(qp, "packet");//讀取[#1.7]的消息,即NEWLEADER的ack
                                            ->leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());//阻塞,直到收到足夠的Follower的NEWLEADER的ack
                                                ->Leader.newLeaderProposal.addAck(sid);
                                            ->[#1.9]while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}//阻塞句灌,等待leader的ZooKeeperServer.state變?yōu)镽UNNING
                                            ->[#1.10]queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //發(fā)送UPTODATE消息給Follower
                                            ->while (true) //崩潰恢復(fù)階段完成夷陋,進(jìn)入普通消息處理階段,leader與follower之前的消息處理參考消息廣播階段示例
                                                ->ia.readRecord(qp, "packet");
                                                ->.....
                            ->zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
                            ->lastProposed = zk.getZxid();
                            ->waitForEpochAck(self.getId(), leaderStateSummary);//主節(jié)點也是投票節(jié)點,參與新節(jié)點選舉的策略
                            ->self.setCurrentEpoch(epoch);
                            ->[#1.6]newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);
                            ->waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//阻塞胰锌,并投出本leader的票骗绕,與LearnerHandler一塊參與newLeaderProposal的過半策略
                                ->Leader.newLeaderProposal.addAck(sid);
                            ->startZkServer(); //啟動leader,
                                ->Leader.startZkServer
                                    ->lastCommitted = zk.getZxid()
                                    ->zk.startup();
                                        ->ZooKeeperServer.startup
                                            ->startSessionTracker();
                                            ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.setupRequestProcessors();//啟動處理鏈资昧,處理鏈的邏輯下面消息廣播詳細(xì)說明
                                            ->setState(State.RUNNING);//標(biāo)志leader啟動完成酬土,并觸發(fā)LearnerHandler[#1.9]繼續(xù)向下走
                                    ->self.updateElectionVote(getEpoch());//設(shè)置QuorumPeer.currentVote
                                    ->zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
                            ->while (true) //循環(huán)向follower發(fā)送pin消息,獲取session等信息格带,PING處理不再展開
                                ->f.ping();
                                    ->LearnerHandler.ping
                                        ->QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
                                        ->queuePacket(ping);
                    ->case FOLLOWING: //FOLLOWING節(jié)點
                        ->setFollower(makeFollower(logFactory));
                        ->org.apache.zookeeper.server.quorum.Follower.followLeader
                            ->connectToLeader(addr);
                            ->long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                                ->Learner.registerWithLeader
                                    ->qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
                                    ->boa.writeRecord(li, "LearnerInfo");
                                    ->[#1.1]writePacket(qp, true); //發(fā)送給leader第一個消息注冊
                                    ->readPacket(qp);//讀取leader返回的LEADERINFO[#1.2]
                                    ->if (qp.getType() == Leader.LEADERINFO) 
                                        ->QuorumPeer.setAcceptedEpoch
                                            ->acceptedEpoch = e; //設(shè)置QuorumPeer.acceptedEpoch
                                            ->writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
                                    ->QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
                                    ->[#1.3]writePacket(ackNewEpoch, true);
                                ->syncWithLeader(newEpochZxid);
                                    ->Learner.syncWithLeader
                                        ->readPacket(qp);
                                        ->if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;} //獲取[#1.4]發(fā)送的DIFF消息撤缴,本消息不做任何操作,僅作為標(biāo)志
                                        ->if (qp.getType() == Leader.TRUNC) //處理[#1.5]消息叽唱,截斷多余的消息
                                            ->boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                                        ->zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                                        ->zk.createSessionTracker();
                                        ->outerLoop:
                                            ->while (self.isRunning()) 
                                                ->readPacket(qp);
                                                ->case Leader.PROPOSAL:
                                                    ->packetsNotCommitted.add(pif);//將日志加入到LinkedList packetsNotCommitted中
                                                ->case Leader.COMMIT:
                                                    ->pif = packetsNotCommitted.peekFirst()//取出剛才放入packetsNotCommitted的日志
                                                    ->packetsCommitted.add(qp.getZxid());//放入packetsCommitted中
                                                ->case Leader.NEWLEADER: //處理[#1.6]消息
                                                    ->[#1.7]writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                                                ->case Leader.UPTODATE:
                                                    ->break outerLoop;跳出outerLoop腹泌,完成Follower的崩潰恢復(fù)階段
                                ->while (this.isRunning()) {
                                    ->Follower.readPacket(qp);
                                    ->Follower.processPacket(qp);//不再展開,具體參考消息廣播階段示例
                                ->}     
                ->}

1.2 消息廣播

[Client]org.apache.zookeeper.test.ClientTest.performClientTest
    ->MyWatcher watcher = new MyWatcher();
    ->org.apache.zookeeper.ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
        ->clientConfig = new ZKClientConfig();
        ->watchManager = defaultWatchManager();
        ->cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly)
            ->ClientCnxn.ClientCnxn()
                ->sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
        ->cnxn.start();
            ->ClientCnxn.SendThread.run()
                ->while (state.isAlive())
                    ->SendThread.startConnect();//選擇任何節(jié)點尔觉,建立連接
                        ->clientCnxnSocket.connect(addr);
                            ->org.apache.zookeeper.ClientCnxnSocketNetty.connect
                                ->bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
                                    ->pipeline.addLast("handler", new ZKClientHandler()); //ZKClientHandler作為客戶端的ChannelHandler
                                ->connectFuture = bootstrap.connect(addr); 
                                    ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.channelConnected
                                        ->allChannels.add(ctx.getChannel());
                                        ->addCnxn(cnxn);
                                ->org.apache.zookeeper.ClientCnxnSocketNetty.connect.operationComplete //完成連接
                                    ->org.apache.zookeeper.ClientCnxn.SendThread.primeConnection();
                                        ->ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
                                        ->[#2.1]outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
                                            ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.processMessage
                                                ->org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                                                  ->zks.processConnectRequest(this, bb);//由于第一次發(fā)消息org.apache.zookeeper.server.NettyServerCnxn.initialized為false凉袱,所以走這個分支
                                                    ->org.apache.zookeeper.server.ZooKeeperServer.processConnectRequest //處理[#2.1]的ConnectRequest請求
                                                        ->if (sessionId == 0) createSession(cnxn, passwd, sessionTimeout);
                                                            ->long sessionId = sessionTracker.createSession(timeout);
                                                                ->org.apache.zookeeper.server.quorum.LearnerSessionTracker.nextSessionId.getAndIncrement();
                                                            ->Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
                                                            ->submitRequest(si);//createSession為事務(wù)請求,需要走正常的事務(wù)請求的流程侦铜,由leader發(fā)起Proposal,并最終ack后commit专甩,此處不再展開,后面基于setdata來說下事務(wù)流程
                                                                ->org.apache.zookeeper.server.ZooKeeperServer.firstProcessor.processRequest(si);
                                                                    ->FollowerRequestProcessor.processRequest(si);
                                                  ->initialized = true;
                                        ->clientCnxnSocket.connectionPrimed();
                                    ->wakeupCnxn();
                                        ->outgoingQueue.add(WakeupPacket.getInstance());//發(fā)送空Packet
                    ->sendPing();//達(dá)到閾值后發(fā)送心跳
                        ->RequestHeader h = new RequestHeader(-2, OpCode.ping);
                        ->queuePacket(h, null, null, null, null, null, null, null, null);
                    ->clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);//循環(huán)發(fā)送org.apache.zookeeper.ClientCnxn.pendingQueue里的Packet
            ->ClientCnxn.EventThread.run()
    ->zk.setData("/benwashere", "hi".getBytes(), 57);//事務(wù)流程
        ->final String serverPath = prependChroot(clientPath);
        ->h.setType(ZooDefs.OpCode.setData);
        ->org.apache.zookeeper.ClientCnxn.submitRequest
            ->Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration, watchDeregistration); //將消息放入org.apache.zookeeper.ClientCnxn.outgoingQueue
                ->[Follower]org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                    ->org.apache.zookeeper.server.ZooKeeperServer.processPacket
                        ->Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());
                        ->si.setOwner(ServerCnxn.me);
                        ->org.apache.zookeeper.server.ZooKeeperServer.submitRequest
                            ->touch(si.cnxn);
                            ->firstProcessor.processRequest(si);
                                ->FollowerRequestProcessor.processRequest
                                    ->org.apache.zookeeper.server.quorum.FollowerRequestProcessor.run
                                        ->nextProcessor.processRequest(request);//先放入queuedRequests等待commit之后放入到committedRequests中
                                            ->CommitProcessor.processRequest(request);
                                                ->org.apache.zookeeper.server.quorum.CommitProcessor.run
                                        ->zks.getFollower().request(request); //事務(wù)請求發(fā)送給leader
                                            ->org.apache.zookeeper.server.quorum.Learner.request
                                                ->QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
                                                ->writePacket(qp, true);
                                                    ->leaderOs.writeRecord(pp, "packet");
                                                        ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.REQUEST ->leader.zk.submitLearnerRequest(si);
                                                            ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.submitLearnerRequest -> prepRequestProcessor.processRequest(request);
                                                                ->org.apache.zookeeper.server.PrepRequestProcessor.processRequest //由于請求處理是放入submittedRequests中钉稍,然后等待PrepRequestProcessor單線程run方法順序處理涤躲,即PrepRequestProcessor.pRequest為嚴(yán)格單線程執(zhí)行,不存在并發(fā)問題
                                                                    ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest
                                                                        ->SetDataRequest setDataRequest = new SetDataRequest();
                                                                        ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); //注意此處生成Zxid贡未,即org.apache.zookeeper.server.ZooKeeperServer.hzxid.incrementAndGet();
                                                                            ->zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                                                                            ->checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
                                                                            ->request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
                                                                            ->[#2.2]org.apache.zookeeper.server.PrepRequestProcessor.addChangeRecord(nodeRecord);
                                                                                ->zks.outstandingChanges.add(c);
                                                                                ->zks.outstandingChangesForPath.put(c.path, c);
                                                                        ->request.zxid = zks.getZxid();//設(shè)置上面方法生成的自增zxid
                                                                        ->nextProcessor.processRequest(request); //分叉到兩個條線路CommitProcessor种樱、SyncRequestProcessor,發(fā)送propose給集群follower
                                                                            ->ProposalRequestProcessor.processRequest
                                                                                ->nextProcessor.processRequest(request);
                                                                                    ->CommitProcessor.processRequest(request);
                                                                                ->zks.getLeader().propose(request); -> org.apache.zookeeper.server.quorum.Leader.propose
                                                                                    ->QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,baos.toByteArray(), null);
                                                                                    ->Proposal p = new Proposal();p.packet = pp;
                                                                                    ->p.addQuorumVerifier(self.getQuorumVerifier());
                                                                                    ->lastProposed = p.packet.getZxid();
                                                                                    ->outstandingProposals.put(lastProposed, p);//發(fā)出的投票放到org.apache.zookeeper.server.quorum.Leader.outstandingProposals中
                                                                                    ->sendPacket(pp);//投票發(fā)送給各個follower
                                                                                        ->for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);} //放入org.apache.zookeeper.server.quorum.LearnerHandler.queuedPackets中
                                                                                            ->org.apache.zookeeper.server.quorum.LearnerHandler.sendPackets
                                                                                                ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.PROPOSAL: 
                                                                                                    ->lastQueued = hdr.getZxid(); //保存到Follower.lastQueued俊卤,follower處理txn為嚴(yán)格有序的
                                                                                                    ->FollowerZooKeeperServer.logRequest(hdr, txn);
                                                                                                        ->Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
                                                                                                        ->pendingTxns.add(request); //保存到org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns中嫩挤,供后續(xù)COMMIT時使用
                                                                                                        ->syncProcessor.processRequest(request);
                                                                                                            ->org.apache.zookeeper.server.SyncRequestProcessor.run
                                                                                                                ->si = queuedRequests.take();
                                                                                                                ->zks.getZKDatabase().append(si)
                                                                                                                    ->this.snapLog.append(si);
                                                                                                                        ->org.apache.zookeeper.server.persistence.FileTxnLog.append
                                                                                                                ->zks.getZKDatabase().rollLog(); || zks.takeSnapshot(); //檢查l是否需要做Snapshot以及roll
                                                                                                                ->toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);} //達(dá)到閾值刷新或是沒有新的日志了都刷新,刷新的時候才觸發(fā)持久化到日志文件消恍,并繼續(xù)向下流轉(zhuǎn)
                                                                                                                    ->SyncRequestProcessor.flush
                                                                                                                        ->zks.getZKDatabase().commit(); //將上面的日志刷寫到磁盤上
                                                                                                                            ->org.apache.zookeeper.server.persistence.FileTxnLog.commit
                                                                                                                        ->nextProcessor.processRequest(i);
                                                                                                                            ->SendAckRequestProcessor.processRequest
                                                                                                                                ->QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
                                                                                                                                ->learner.writePacket(qp, false); -> leaderOs.writeRecord(pp, "packet"); //將ack信息返回leader
                                                                                                                                    ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.ACK
                                                                                                                                        ->leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); -> Leader.processAck
                                                                                                                                            ->Proposal p = outstandingProposals.get(zxid);
                                                                                                                                            ->p.addAck(sid); //當(dāng)前sid加入到確認(rèn)列表中
                                                                                                                                            ->boolean hasCommitted = Leader.tryToCommit(p, zxid, followerAddr); //每收到一個ack岂昭,檢查一次commit的閾值,由于leader單線程發(fā)送proposal,所以為嚴(yán)格有序的狠怨,而且過半策略情況下约啊,當(dāng)前zxid未達(dá)到commmit條件邑遏,zxid+1也必然不會達(dá)到commit條件
                                                                                                                                                ->if (!p.hasAllQuorums()) {return false;} ->QuorumMaj.containsQuorum -> (ackSet.size() > half); //沒達(dá)到閾值繼續(xù)下次觸發(fā),QuorumMaj為過半策略
                                                                                                                                                ->if (zxid != lastCommitted+1) {...} //檢查本次commit的zxid是否為上次的zxid+1恰矩,由于zab協(xié)議為嚴(yán)格順序執(zhí)行记盒,且沒有事務(wù)提交失敗情況
                                                                                                                                                ->outstandingProposals.remove(zxid); //移除proposal階段存的txn
                                                                                                                                                ->toBeApplied.add(p);//將待提交的Proposal放入org.apache.zookeeper.server.quorum.Leader.toBeApplied中
                                                                                                                                                ->commit(zxid);
                                                                                                                                                    ->lastCommitted = zxid; //設(shè)置Leader.lastCommitted
                                                                                                                                                    ->QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
                                                                                                                                                    ->sendPacket(qp);
                                                                                                                                                        ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.COMMIT: 
                                                                                                                                                            ->long firstElementZxid = pendingTxns.element().zxid; //取出proposal階段保存的org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns
                                                                                                                                                            ->if (firstElementZxid != zxid) {System.exit(12);} //如果提交的zxid與proposal階段的zxid不一致,說明系統(tǒng)出問題了外傅,直接exit
                                                                                                                                                            ->Request request = pendingTxns.remove(); //從pendingTxns移除
                                                                                                                                                            ->commitProcessor.commit(request);
                                                                                                                                                                ->committedRequests.add(request); -> CommitProcessor.run() //加入CommitProcessor.committedRequests
                                                                                                                                                                    ->nextPending.set(request); //放入org.apache.zookeeper.server.quorum.CommitProcessor.nextPending
                                                                                                                                                                    ->processCommitted();
                                                                                                                                                                        ->request = committedRequests.poll();//取出nextPending以及committedRequests對比孽鸡,如果一致
                                                                                                                                                                        ->Request pending = nextPending.get();
                                                                                                                                                                        ->if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid)
                                                                                                                                                                            ->currentlyCommitting.set(pending);//保存到CommitProcessor.currentlyCommitting中
                                                                                                                                                                            ->sendToNextProcessor(pending); //workerPool此處按照request.sessionId分組了
                                                                                                                                                                                ->workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
                                                                                                                                                                                    ->org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest.doWork
                                                                                                                                                                                        ->nextProcessor.processRequest(request); -> Leader.ToBeAppliedRequestProcessor.processRequest
                                                                                                                                                                                            ->next.processRequest(request);
                                                                                                                                                                                                ->FinalRequestProcessor.processRequest(request);
                                                                                                                                                                                                    ->rc = zks.processTxn(request); //講事務(wù)應(yīng)用到內(nèi)存數(shù)據(jù)庫中
                                                                                                                                                                                                        ->org.apache.zookeeper.server.DataTree.processTxn ->case OpCode.setData:
                                                                                                                                                                                                            ->DataTree.setData
                                                                                                                                                                                                                ->DataNode n = nodes.get(path);n.data = data; //更新內(nèi)存數(shù)據(jù)節(jié)點
                                                                                                                                                                                                                ->dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發(fā)trigger
                                                                                                                                                                                                        ->lastProcessedZxid = rc.zxid; //更新org.apache.zookeeper.server.DataTree.lastProcessedZxid //此字段表明commit的offset,選leader同步數(shù)據(jù)都會用到
                                                                                                                                                                                                    ->rsp = new SetDataResponse(rc.stat);                                                                                                                                                                                                                                                                                                                                                                                           ->
                                                                                                                                                                                            ->leader.toBeApplied.iterator().remove();//將提交完成的從Leader.toBeApplied移除
                                                                                                                                                                                        ->currentlyCommitting.compareAndSet(request, null); //currentlyCommitting置空
                                                                                                                                                ->inform(p); //發(fā)送給observer本次Proposal
                                                                                                                                                    ->QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);
                                                                                                                                                    ->sendObserverPacket(qp);
                                                                                                                                                ->zk.commitProcessor.commit(p.request); //執(zhí)行Leader的commit栏豺,執(zhí)行邏輯參考Follower,區(qū)別僅僅是移除[#2.2]階段放入outstandingChanges彬碱、outstandingChangesForPath的數(shù)據(jù)


                                                                                                                                                ->...//pendingSyncs跳過,暫不考慮leader接收client請求的情況
                                                                                                                        ->((Flushable)nextProcessor).flush();
                                                                                                                            ->SendAckRequestProcessor.flush
                                                                                ->syncProcessor.processRequest(request);
            ->while (!packet.finished) {packet.wait();} //阻塞奥洼,等待回復(fù)

二.reference

三.下篇

REDIS 3.2.8 CODE REVIEW

趁著公司618備戰(zhàn)期間把 zk review下巷疼,看源碼感覺看得快忘的也快,還是寫個index記錄思路來的清晰灵奖,本想把redis嚼沿、spark-core、spark-streaming瓷患、spark-graphx骡尽、hive,jdk擅编、netty什么的都寫下攀细,發(fā)現(xiàn)每review一個還真是挺廢精力的,很多當(dāng)時想明白的問題爱态,現(xiàn)在看之前的寫的TODO谭贪,發(fā)現(xiàn)一臉懵逼,已經(jīng)忘了锦担。有的當(dāng)時看的版本比較老俭识,比如spark1.8版本也不打算再review了,回頭直接看2.x 版本SQL的實現(xiàn)了洞渔;有的看只看了一部分套媚,hive的詞法解析器、物理執(zhí)行計劃基本都沒咋看磁椒,所以寫個review也沒有整體概念堤瘤,單寫個別模塊貌似沒啥意思。先給自己挖個坑了衷快,剩下的以后再補(bǔ)了...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末宙橱,一起剝皮案震驚了整個濱河市姨俩,隨后出現(xiàn)的幾起案子蘸拔,更是在濱河造成了極大的恐慌师郑,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件调窍,死亡現(xiàn)場離奇詭異宝冕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)邓萨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門地梨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人缔恳,你說我怎么就攤上這事宝剖。” “怎么了歉甚?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵万细,是天一觀的道長。 經(jīng)常有香客問我纸泄,道長赖钞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任聘裁,我火速辦了婚禮雪营,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘衡便。我一直安慰自己献起,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布镣陕。 她就那樣靜靜地躺著征唬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪茁彭。 梳的紋絲不亂的頭發(fā)上总寒,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機(jī)與錄音理肺,去河邊找鬼摄闸。 笑死,一個胖子當(dāng)著我的面吹牛妹萨,可吹牛的內(nèi)容都是我干的年枕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乎完,長吁一口氣:“原來是場噩夢啊……” “哼熏兄!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤摩桶,失蹤者是張志新(化名)和其女友劉穎桥状,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體硝清,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡辅斟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了芦拿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片士飒。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蔗崎,靈堂內(nèi)的尸體忽然破棺而出酵幕,到底是詐尸還是另有隱情,我是刑警寧澤缓苛,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布裙盾,位于F島的核電站,受9級特大地震影響他嫡,放射性物質(zhì)發(fā)生泄漏番官。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一钢属、第九天 我趴在偏房一處隱蔽的房頂上張望徘熔。 院中可真熱鬧,春花似錦淆党、人聲如沸酷师。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽山孔。三九已至,卻和暖如春荷憋,著一層夾襖步出監(jiān)牢的瞬間台颠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工勒庄, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留串前,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓实蔽,卻偏偏與公主長得像荡碾,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子局装,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容