導(dǎo)語
1.zab協(xié)議崩潰恢復(fù)如何實現(xiàn)leader選舉及數(shù)據(jù)同步?
2.zab消息廣播階段如何實現(xiàn)發(fā)起投票很魂、收集選票、提交事務(wù)稚铣,并保證事務(wù)的順序一致性?
3.paxos持偏、zab虱而、raft vs pbft vs pow、pos葱椭、ripple 等區(qū)塊鏈共識算法的區(qū)別是什么捂寿,分別適合什么場景?//TODO:
zab協(xié)議包含兩個階段崩潰恢復(fù)
與消息廣播
,基于zookeeper 3.5.3集群啟動以及ZooKeeper.setData來分別說明下兩階段的流程孵运。
一.Index
1.1 崩潰恢復(fù)
org.apache.zookeeper.server.quorum.QuorumPeerMain.main
->QuorumPeerMain.runFromConfig
->NettyServerCnxnFactory.configure //配置ServerCnxnFactory
->quorumPeer = getQuorumPeer();
->quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir())) //設(shè)置FileTxnSnapLog秦陋,管理txn以及snap
->quorumPeer.setMyid(config.getServerId()) //設(shè)置myid
->quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit()); //超時時間
->quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//初始化一個空的ZKDatabase
->quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //設(shè)置QuorumVerifier
->quorumPeer.setCnxnFactory(cnxnFactory)
->org.apache.zookeeper.server.quorum.QuorumPeer.start() //啟動QuorumPeer
->QuorumPeer.loadDataBase
->zkDb.loadDataBase() //恢復(fù)掛機(jī)之前的內(nèi)存狀態(tài)
->long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
->processTransaction(hdr,dt,sessions, itr.getTxn());//重放所有的Transaction,
->listener.onTxnLoaded(hdr, itr.getTxn()); //將已提交的日志放入org.apache.zookeeper.server.ZKDatabase.committedLog中并更新minCommittedLog治笨、maxCommittedLog兩個offset便于同步時使用
->addCommittedProposal(r)
->currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //獲取之前記錄的Epoch
->acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
->QuorumPeer.startServerCnxnFactory //綁定端口驳概,啟動服務(wù)
->QuorumPeer.startLeaderElection //開啟選舉
->currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//設(shè)置當(dāng)前服務(wù)器的投票,使用之前記錄的epoch以及zxid
->this.electionAlg = createElectionAlgorithm(electionType); //設(shè)置選舉算法粪小,默認(rèn)
->qcm = new QuorumCnxManager(this);
->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run
->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ss.bind(addr)//綁定選舉端口
->client = ss.accept();//等待連接
->org.apache.zookeeper.server.quorum.QuorumCnxManager.receiveConnection //如果建立鏈接的對端的的sid小于當(dāng)前服務(wù)器id,則當(dāng)前服務(wù)器作為客戶端去建立鏈接抡句,否則啟動發(fā)送接收線程開始選舉流程探膊,此邏輯是為了避免兩臺服務(wù)器建立多個鏈接
->closeSocket(sock);//關(guān)閉之前的連接
->org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne
->QuorumCnxManager.initiateConnection //net層面的數(shù)據(jù)接收發(fā)發(fā)送
->dout.writeLong(self.getId());dout.flush(); //建立鏈接發(fā)送的第一條為當(dāng)前服務(wù)器端myid
->SendWorker sw = new SendWorker(sock, sid); //將QuorumCnxManager.queueSendMap發(fā)送出去
->RecvWorker rw = new RecvWorker(sock, sid, sw);//將收到的消息加入QuorumCnxManager.recvQueue中
->sw.start();rw.start();
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.start //啟動消息處理器
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender.run
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver.run
->QuorumPeer.run
->while (running) {
->case LOOKING //注意leader選舉用的當(dāng)前節(jié)點的zxid為org.apache.zookeeper.server.DataTree.lastProcessedZxid,而本字段為FinalRequestProcessor.processRequest方法更新,F(xiàn)inalRequestProcessor為ProposalRequestProcessor->CommitProcessor之后的待榔,即commit階段才會更新逞壁,則zk其實是按照進(jìn)入提交階段zxid為準(zhǔn),即寫入過半后發(fā)出大于1個commit后算寫入成功
->setCurrentVote(makeLEStrategy().lookForLeader());
->org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader
->synchronized(this){logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());} //更新選舉周期,并設(shè)置當(dāng)前服務(wù)器的leader選舉propose為自己:FastLeaderElection.proposedLeader,proposedZxid,proposedEpoch
->sendNotifications //將自己的投票發(fā)給集群中所有節(jié)點
->ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());
->sendqueue.offer(notmsg);
->while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){ //執(zhí)行循環(huán)锐锣,直到確定leader
->Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //獲取收到的Notification
->switch (n.state) {
->case LOOKING:
->if (n.electionEpoch > logicalclock.get()) { //如果收到的投票周期大于當(dāng)前節(jié)點認(rèn)為的周期腌闯,則更新周期、清空已經(jīng)收到的投票雕憔、
->logicalclock.set(n.electionEpoch);
->recvset.clear();
->if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch())} //更新當(dāng)前節(jié)點的投票姿骏,先比較周期,周期相同再比較zxid斤彼,zxid相同然后就比較myid分瘦,然后選大的作為當(dāng)前節(jié)點后續(xù)的投票
->sendNotifications();//將最新的投票發(fā)給集群節(jié)點
->else if (n.electionEpoch < logicalclock.get()) {} //如果收到的投票周期小于當(dāng)前節(jié)點周期,則忽略,繼續(xù)取下一條Notification
->else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); //收到的投票比較大琉苇,則更新為收到的投票嘲玫,并發(fā)送給集群其他節(jié)點
->recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //以上條件都不符合,則為收到的投票為其他節(jié)點認(rèn)可當(dāng)前節(jié)點的投票,把選票更新到recvset里
->if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { //查看收到的選票是否達(dá)到了要求并扇,QuorumMaj為過半策略去团,沒有達(dá)到要求繼續(xù)循環(huán)獲取Notification,否則設(shè)置投票結(jié)果穷蛹,結(jié)束leader選舉
->SyncedLearnerTracker.addAck(entry.getKey());
->QuorumVerifierAcksetPair.getAckset().add(sid);
->SyncedLearnerTracker.hasAllQuorums
->qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
->QuorumMaj.containsQuorum
-> while((n = recvqueue.poll(finalizeWait..//讀取剩余的notification土陪,直到讀取完成
-> self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState()); //如果選舉的leader為自己,則QuorumPeer.state設(shè)為LEADING肴熏,否則設(shè)為FOLLOWING
->Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
->return endVote;//結(jié)束選舉
->}
->case LEADING: //如果當(dāng)前節(jié)點為lead節(jié)點
->QuorumPeer.makeLeader
->org.apache.zookeeper.server.quorum.Leader.Leader
->Leader.ss.bind(self.getQuorumAddress());//leader綁定端口號等待follower建立連接鬼雀,本端口用來同步數(shù)據(jù),leader與follower之間心跳扮超、投票等數(shù)據(jù)交換
->org.apache.zookeeper.server.quorum.Leader.lead
->zk.loadData();
->setZxid(zkDb.getDataTreeLastProcessedZxid()); //設(shè)置zxid
->killSession(session, zkDb.getDataTreeLastProcessedZxid()); //清理過期的session
->takeSnapshot();//落地一個干凈的Snapshot
->cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
->LearnerCnxAcceptor.run
-> while (!stop) {Socket s = ss.accept();LearnerHandler fh = new LearnerHandler(s, Leader.this);fh.start();} //等待鏈接取刃,初始化follower的LearnerHandler作為消息處理器
->LearnerHandler.run //leader處理flower請求的主流程
->ia.readRecord(qp, "packet");//讀取第一個follower發(fā)送過來的消息[#1.1]蹋肮,讀取出sid出刷,設(shè)置LearnerHandler的sid
->this.sid = bbsid.getLong();
->QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); //發(fā)送leaderinof
->[#1.2]oa.writeRecord(newEpochPacket, "packet");
->ia.readRecord(ackEpochPacket, "packet");//讀取Follower的ack消息[#1.3]
->ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
->leader.waitForEpochAck(this.getSid(), ss);
->Leader.waitForEpochAck //阻塞,直到過半Follower返回ack消息
->electingFollowers.add(id);
->if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers))
->electionFinished = true;
->electingFollowers.notifyAll();
->boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); //與follower同步
->long maxCommittedLog = db.getmaxCommittedLog(); //獲取三個值坯辩,zxid以及內(nèi)存中ZKDatabase.committedLog保存的最大最小日志
->long minCommittedLog = db.getminCommittedLog();
->long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
->[#1.4]if (lastProcessedZxid == peerLastZxid) {queueOpPacket(Leader.DIFF, peerLastZxid);}//如果相同馁龟,則發(fā)送Leader.DIFF消息
->[#1.5]if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) //如果leader節(jié)點的log小于當(dāng)前節(jié)點,則發(fā)送TRUNC消息截斷
->queueOpPacket(Leader.TRUNC, maxCommittedLog);
->currentZxid = maxCommittedLog;
->if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) //如果處在maxCommittedLog與minCommittedLog中間漆魔,則說明缺了部分?jǐn)?shù)據(jù)坷檩,則按照普通投票方式把ZKDatabase.committedLog內(nèi)存中保存的這部分?jǐn)?shù)據(jù)按照普通的Proposal發(fā)送出去却音,并提交Committed
->LearnerHandler.queueCommittedProposals
->while (itr.hasNext())
->Proposal propose = itr.next()
->queuePacket(propose.packet);//發(fā)送propose
->queueOpPacket(Leader.COMMIT, packetZxid);//提交propose
->queueOpPacket(Leader.DIFF, lastCommitedZxid); //不管啥情況下,都發(fā)個DIFF消息表名同步完成
->if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {//如果內(nèi)存中l(wèi)og不夠矢炼,則把Log以及snap中的消息都發(fā)過去
->leaderLastZxid = leader.startForwarding(this, currentZxid);[TODO://]//處理toBeApplied系瓢、outstandingProposals的目的是啥
->[#1.6]QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);bufferedOutput.flush();//發(fā)送NEWLEADER消息
->ia.readRecord(qp, "packet");//讀取[#1.7]的消息,即NEWLEADER的ack
->leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());//阻塞,直到收到足夠的Follower的NEWLEADER的ack
->Leader.newLeaderProposal.addAck(sid);
->[#1.9]while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}//阻塞句灌,等待leader的ZooKeeperServer.state變?yōu)镽UNNING
->[#1.10]queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //發(fā)送UPTODATE消息給Follower
->while (true) //崩潰恢復(fù)階段完成夷陋,進(jìn)入普通消息處理階段,leader與follower之前的消息處理參考消息廣播階段示例
->ia.readRecord(qp, "packet");
->.....
->zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
->lastProposed = zk.getZxid();
->waitForEpochAck(self.getId(), leaderStateSummary);//主節(jié)點也是投票節(jié)點,參與新節(jié)點選舉的策略
->self.setCurrentEpoch(epoch);
->[#1.6]newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);
->waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//阻塞胰锌,并投出本leader的票骗绕,與LearnerHandler一塊參與newLeaderProposal的過半策略
->Leader.newLeaderProposal.addAck(sid);
->startZkServer(); //啟動leader,
->Leader.startZkServer
->lastCommitted = zk.getZxid()
->zk.startup();
->ZooKeeperServer.startup
->startSessionTracker();
->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.setupRequestProcessors();//啟動處理鏈资昧,處理鏈的邏輯下面消息廣播詳細(xì)說明
->setState(State.RUNNING);//標(biāo)志leader啟動完成酬土,并觸發(fā)LearnerHandler[#1.9]繼續(xù)向下走
->self.updateElectionVote(getEpoch());//設(shè)置QuorumPeer.currentVote
->zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
->while (true) //循環(huán)向follower發(fā)送pin消息,獲取session等信息格带,PING處理不再展開
->f.ping();
->LearnerHandler.ping
->QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
->queuePacket(ping);
->case FOLLOWING: //FOLLOWING節(jié)點
->setFollower(makeFollower(logFactory));
->org.apache.zookeeper.server.quorum.Follower.followLeader
->connectToLeader(addr);
->long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
->Learner.registerWithLeader
->qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
->boa.writeRecord(li, "LearnerInfo");
->[#1.1]writePacket(qp, true); //發(fā)送給leader第一個消息注冊
->readPacket(qp);//讀取leader返回的LEADERINFO[#1.2]
->if (qp.getType() == Leader.LEADERINFO)
->QuorumPeer.setAcceptedEpoch
->acceptedEpoch = e; //設(shè)置QuorumPeer.acceptedEpoch
->writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
->QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
->[#1.3]writePacket(ackNewEpoch, true);
->syncWithLeader(newEpochZxid);
->Learner.syncWithLeader
->readPacket(qp);
->if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;} //獲取[#1.4]發(fā)送的DIFF消息撤缴,本消息不做任何操作,僅作為標(biāo)志
->if (qp.getType() == Leader.TRUNC) //處理[#1.5]消息叽唱,截斷多余的消息
->boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
->zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
->zk.createSessionTracker();
->outerLoop:
->while (self.isRunning())
->readPacket(qp);
->case Leader.PROPOSAL:
->packetsNotCommitted.add(pif);//將日志加入到LinkedList packetsNotCommitted中
->case Leader.COMMIT:
->pif = packetsNotCommitted.peekFirst()//取出剛才放入packetsNotCommitted的日志
->packetsCommitted.add(qp.getZxid());//放入packetsCommitted中
->case Leader.NEWLEADER: //處理[#1.6]消息
->[#1.7]writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
->case Leader.UPTODATE:
->break outerLoop;跳出outerLoop腹泌,完成Follower的崩潰恢復(fù)階段
->while (this.isRunning()) {
->Follower.readPacket(qp);
->Follower.processPacket(qp);//不再展開,具體參考消息廣播階段示例
->}
->}
1.2 消息廣播
[Client]org.apache.zookeeper.test.ClientTest.performClientTest
->MyWatcher watcher = new MyWatcher();
->org.apache.zookeeper.ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
->clientConfig = new ZKClientConfig();
->watchManager = defaultWatchManager();
->cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly)
->ClientCnxn.ClientCnxn()
->sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
->cnxn.start();
->ClientCnxn.SendThread.run()
->while (state.isAlive())
->SendThread.startConnect();//選擇任何節(jié)點尔觉,建立連接
->clientCnxnSocket.connect(addr);
->org.apache.zookeeper.ClientCnxnSocketNetty.connect
->bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
->pipeline.addLast("handler", new ZKClientHandler()); //ZKClientHandler作為客戶端的ChannelHandler
->connectFuture = bootstrap.connect(addr);
->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.channelConnected
->allChannels.add(ctx.getChannel());
->addCnxn(cnxn);
->org.apache.zookeeper.ClientCnxnSocketNetty.connect.operationComplete //完成連接
->org.apache.zookeeper.ClientCnxn.SendThread.primeConnection();
->ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
->[#2.1]outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.processMessage
->org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
->zks.processConnectRequest(this, bb);//由于第一次發(fā)消息org.apache.zookeeper.server.NettyServerCnxn.initialized為false凉袱,所以走這個分支
->org.apache.zookeeper.server.ZooKeeperServer.processConnectRequest //處理[#2.1]的ConnectRequest請求
->if (sessionId == 0) createSession(cnxn, passwd, sessionTimeout);
->long sessionId = sessionTracker.createSession(timeout);
->org.apache.zookeeper.server.quorum.LearnerSessionTracker.nextSessionId.getAndIncrement();
->Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
->submitRequest(si);//createSession為事務(wù)請求,需要走正常的事務(wù)請求的流程侦铜,由leader發(fā)起Proposal,并最終ack后commit专甩,此處不再展開,后面基于setdata來說下事務(wù)流程
->org.apache.zookeeper.server.ZooKeeperServer.firstProcessor.processRequest(si);
->FollowerRequestProcessor.processRequest(si);
->initialized = true;
->clientCnxnSocket.connectionPrimed();
->wakeupCnxn();
->outgoingQueue.add(WakeupPacket.getInstance());//發(fā)送空Packet
->sendPing();//達(dá)到閾值后發(fā)送心跳
->RequestHeader h = new RequestHeader(-2, OpCode.ping);
->queuePacket(h, null, null, null, null, null, null, null, null);
->clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);//循環(huán)發(fā)送org.apache.zookeeper.ClientCnxn.pendingQueue里的Packet
->ClientCnxn.EventThread.run()
->zk.setData("/benwashere", "hi".getBytes(), 57);//事務(wù)流程
->final String serverPath = prependChroot(clientPath);
->h.setType(ZooDefs.OpCode.setData);
->org.apache.zookeeper.ClientCnxn.submitRequest
->Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration, watchDeregistration); //將消息放入org.apache.zookeeper.ClientCnxn.outgoingQueue
->[Follower]org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
->org.apache.zookeeper.server.ZooKeeperServer.processPacket
->Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());
->si.setOwner(ServerCnxn.me);
->org.apache.zookeeper.server.ZooKeeperServer.submitRequest
->touch(si.cnxn);
->firstProcessor.processRequest(si);
->FollowerRequestProcessor.processRequest
->org.apache.zookeeper.server.quorum.FollowerRequestProcessor.run
->nextProcessor.processRequest(request);//先放入queuedRequests等待commit之后放入到committedRequests中
->CommitProcessor.processRequest(request);
->org.apache.zookeeper.server.quorum.CommitProcessor.run
->zks.getFollower().request(request); //事務(wù)請求發(fā)送給leader
->org.apache.zookeeper.server.quorum.Learner.request
->QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
->writePacket(qp, true);
->leaderOs.writeRecord(pp, "packet");
->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.REQUEST ->leader.zk.submitLearnerRequest(si);
->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.submitLearnerRequest -> prepRequestProcessor.processRequest(request);
->org.apache.zookeeper.server.PrepRequestProcessor.processRequest //由于請求處理是放入submittedRequests中钉稍,然后等待PrepRequestProcessor單線程run方法順序處理涤躲,即PrepRequestProcessor.pRequest為嚴(yán)格單線程執(zhí)行,不存在并發(fā)問題
->org.apache.zookeeper.server.PrepRequestProcessor.pRequest
->SetDataRequest setDataRequest = new SetDataRequest();
->org.apache.zookeeper.server.PrepRequestProcessor.pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); //注意此處生成Zxid贡未,即org.apache.zookeeper.server.ZooKeeperServer.hzxid.incrementAndGet();
->zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
->checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
->request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
->[#2.2]org.apache.zookeeper.server.PrepRequestProcessor.addChangeRecord(nodeRecord);
->zks.outstandingChanges.add(c);
->zks.outstandingChangesForPath.put(c.path, c);
->request.zxid = zks.getZxid();//設(shè)置上面方法生成的自增zxid
->nextProcessor.processRequest(request); //分叉到兩個條線路CommitProcessor种樱、SyncRequestProcessor,發(fā)送propose給集群follower
->ProposalRequestProcessor.processRequest
->nextProcessor.processRequest(request);
->CommitProcessor.processRequest(request);
->zks.getLeader().propose(request); -> org.apache.zookeeper.server.quorum.Leader.propose
->QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,baos.toByteArray(), null);
->Proposal p = new Proposal();p.packet = pp;
->p.addQuorumVerifier(self.getQuorumVerifier());
->lastProposed = p.packet.getZxid();
->outstandingProposals.put(lastProposed, p);//發(fā)出的投票放到org.apache.zookeeper.server.quorum.Leader.outstandingProposals中
->sendPacket(pp);//投票發(fā)送給各個follower
->for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);} //放入org.apache.zookeeper.server.quorum.LearnerHandler.queuedPackets中
->org.apache.zookeeper.server.quorum.LearnerHandler.sendPackets
->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.PROPOSAL:
->lastQueued = hdr.getZxid(); //保存到Follower.lastQueued俊卤,follower處理txn為嚴(yán)格有序的
->FollowerZooKeeperServer.logRequest(hdr, txn);
->Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
->pendingTxns.add(request); //保存到org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns中嫩挤,供后續(xù)COMMIT時使用
->syncProcessor.processRequest(request);
->org.apache.zookeeper.server.SyncRequestProcessor.run
->si = queuedRequests.take();
->zks.getZKDatabase().append(si)
->this.snapLog.append(si);
->org.apache.zookeeper.server.persistence.FileTxnLog.append
->zks.getZKDatabase().rollLog(); || zks.takeSnapshot(); //檢查l是否需要做Snapshot以及roll
->toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);} //達(dá)到閾值刷新或是沒有新的日志了都刷新,刷新的時候才觸發(fā)持久化到日志文件消恍,并繼續(xù)向下流轉(zhuǎn)
->SyncRequestProcessor.flush
->zks.getZKDatabase().commit(); //將上面的日志刷寫到磁盤上
->org.apache.zookeeper.server.persistence.FileTxnLog.commit
->nextProcessor.processRequest(i);
->SendAckRequestProcessor.processRequest
->QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
->learner.writePacket(qp, false); -> leaderOs.writeRecord(pp, "packet"); //將ack信息返回leader
->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.ACK
->leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); -> Leader.processAck
->Proposal p = outstandingProposals.get(zxid);
->p.addAck(sid); //當(dāng)前sid加入到確認(rèn)列表中
->boolean hasCommitted = Leader.tryToCommit(p, zxid, followerAddr); //每收到一個ack岂昭,檢查一次commit的閾值,由于leader單線程發(fā)送proposal,所以為嚴(yán)格有序的狠怨,而且過半策略情況下约啊,當(dāng)前zxid未達(dá)到commmit條件邑遏,zxid+1也必然不會達(dá)到commit條件
->if (!p.hasAllQuorums()) {return false;} ->QuorumMaj.containsQuorum -> (ackSet.size() > half); //沒達(dá)到閾值繼續(xù)下次觸發(fā),QuorumMaj為過半策略
->if (zxid != lastCommitted+1) {...} //檢查本次commit的zxid是否為上次的zxid+1恰矩,由于zab協(xié)議為嚴(yán)格順序執(zhí)行记盒,且沒有事務(wù)提交失敗情況
->outstandingProposals.remove(zxid); //移除proposal階段存的txn
->toBeApplied.add(p);//將待提交的Proposal放入org.apache.zookeeper.server.quorum.Leader.toBeApplied中
->commit(zxid);
->lastCommitted = zxid; //設(shè)置Leader.lastCommitted
->QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
->sendPacket(qp);
->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.COMMIT:
->long firstElementZxid = pendingTxns.element().zxid; //取出proposal階段保存的org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns
->if (firstElementZxid != zxid) {System.exit(12);} //如果提交的zxid與proposal階段的zxid不一致,說明系統(tǒng)出問題了外傅,直接exit
->Request request = pendingTxns.remove(); //從pendingTxns移除
->commitProcessor.commit(request);
->committedRequests.add(request); -> CommitProcessor.run() //加入CommitProcessor.committedRequests
->nextPending.set(request); //放入org.apache.zookeeper.server.quorum.CommitProcessor.nextPending
->processCommitted();
->request = committedRequests.poll();//取出nextPending以及committedRequests對比孽鸡,如果一致
->Request pending = nextPending.get();
->if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid)
->currentlyCommitting.set(pending);//保存到CommitProcessor.currentlyCommitting中
->sendToNextProcessor(pending); //workerPool此處按照request.sessionId分組了
->workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
->org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest.doWork
->nextProcessor.processRequest(request); -> Leader.ToBeAppliedRequestProcessor.processRequest
->next.processRequest(request);
->FinalRequestProcessor.processRequest(request);
->rc = zks.processTxn(request); //講事務(wù)應(yīng)用到內(nèi)存數(shù)據(jù)庫中
->org.apache.zookeeper.server.DataTree.processTxn ->case OpCode.setData:
->DataTree.setData
->DataNode n = nodes.get(path);n.data = data; //更新內(nèi)存數(shù)據(jù)節(jié)點
->dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發(fā)trigger
->lastProcessedZxid = rc.zxid; //更新org.apache.zookeeper.server.DataTree.lastProcessedZxid //此字段表明commit的offset,選leader同步數(shù)據(jù)都會用到
->rsp = new SetDataResponse(rc.stat); ->
->leader.toBeApplied.iterator().remove();//將提交完成的從Leader.toBeApplied移除
->currentlyCommitting.compareAndSet(request, null); //currentlyCommitting置空
->inform(p); //發(fā)送給observer本次Proposal
->QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);
->sendObserverPacket(qp);
->zk.commitProcessor.commit(p.request); //執(zhí)行Leader的commit栏豺,執(zhí)行邏輯參考Follower,區(qū)別僅僅是移除[#2.2]階段放入outstandingChanges彬碱、outstandingChangesForPath的數(shù)據(jù)
->...//pendingSyncs跳過,暫不考慮leader接收client請求的情況
->((Flushable)nextProcessor).flush();
->SendAckRequestProcessor.flush
->syncProcessor.processRequest(request);
->while (!packet.finished) {packet.wait();} //阻塞奥洼,等待回復(fù)
二.reference
三.下篇
趁著公司618備戰(zhàn)期間把 zk review下巷疼,看源碼感覺看得快忘的也快,還是寫個index記錄思路來的清晰灵奖,本想把redis嚼沿、spark-core、spark-streaming瓷患、spark-graphx骡尽、hive,jdk擅编、netty什么的都寫下攀细,發(fā)現(xiàn)每review一個還真是挺廢精力的,很多當(dāng)時想明白的問題爱态,現(xiàn)在看之前的寫的TODO谭贪,發(fā)現(xiàn)一臉懵逼,已經(jīng)忘了锦担。有的當(dāng)時看的版本比較老俭识,比如spark1.8版本也不打算再review了,回頭直接看2.x 版本SQL的實現(xiàn)了洞渔;有的看只看了一部分套媚,hive的詞法解析器、物理執(zhí)行計劃基本都沒咋看磁椒,所以寫個review也沒有整體概念堤瘤,單寫個別模塊貌似沒啥意思。先給自己挖個坑了衷快,剩下的以后再補(bǔ)了...