集群和單機(jī)版啟動(dòng)類都是QuorumPeerMain
穷绵,進(jìn)入initializeAndRun
方法
啟動(dòng)
- 解析配置文件
zoo.cfg
- 創(chuàng)建并啟動(dòng)歷史文件清理器
DatadirCleanupManager
- 根據(jù)集群模式還是單機(jī)模式的啟動(dòng)
if (args.length == 1 && config.servers.size() > 0) {
// 集群
runFromConfig(config);
} else {
ZooKeeperServerMain.main(args);
}
集群模式會(huì)進(jìn)入if塊
初始化
運(yùn)行runFromConfig
方法杉辙,在runFromConfig
方法內(nèi)部可以看到挑秉,其核心實(shí)例是QuorumPeer
,而不再是單機(jī)模式的ZooKeeperServer
實(shí)例勋磕,QuorumPeer
實(shí)例可以看作是集群的一個(gè)節(jié)點(diǎn)妈候,集群中的所有的QuorumPeer
實(shí)例協(xié)作完成集群的選舉、投票挂滓。
-
創(chuàng)建并配置
ServerCnxnFactory
苦银,和單機(jī)版一致。ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
cnxnFactory
會(huì)賦值給quorumPeer
:quorumPeer.setCnxnFactory(cnxnFactory);
-
實(shí)例化
quorumPeer
并設(shè)值quorumPeer = getQuorumPeer(); // 設(shè)置集群所有的peer赶站,集群機(jī)器之間互相通信 quorumPeer.setQuorumPeers(config.getServers()); ...
這個(gè)就是根據(jù)配置中
server.id
解析出來的幔虏,如server.1=localhost:2888:3888 server.2=localhost:2887:3887 server.3=localhost:2886:3886
-
創(chuàng)建持久化文件管理器
FileTxnSnapLog
,并給quorumPeer
賦值quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir())));
-
創(chuàng)建內(nèi)存數(shù)據(jù)庫(kù)贝椿,并賦值給
quorumPeer
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
-
初始化并啟動(dòng)
quorumPeer
quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join();
QuorumPeer#start
方法//QuorumPeer#start public synchronized void start() { loadDataBase(); cnxnFactory.start(); startLeaderElection(); super.start(); }
啟動(dòng)
quorumPeer
步驟有- 加載內(nèi)存數(shù)據(jù)庫(kù)
- 啟動(dòng)
cnxnFactory
想括,客戶端連接的IO線程 - 集群選舉
- 選舉線程啟動(dòng)
- 集群版加載內(nèi)存數(shù)據(jù)庫(kù)會(huì)去分析當(dāng)前的Epoch
private long acceptedEpoch = -1; private long currentEpoch = -1;
-
啟動(dòng)
cnxnFactory
后,這時(shí)候客戶端IO線程是沒法工作的烙博,因?yàn)樵趧?chuàng)建客戶端連接的時(shí)候需要zkServer
變量瑟蜈,處理調(diào)用鏈protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) { return new NIOServerCnxn(zkServer, sock, sk, this); }
需要等集群選舉完成、數(shù)據(jù)同步完成后渣窜,為其賦值铺根,才能開啟工作
所以先主要分析集群選舉和選舉線程啟動(dòng)
集群選舉
集群選舉需要當(dāng)前peer與其他機(jī)器在選舉端口上建立連接,然后發(fā)送投票進(jìn)行選舉乔宿,選舉端口在配置文件中配置
server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.
其中夷都,第一個(gè)端口用于指定Follower服務(wù)器與Leader進(jìn)行運(yùn)行時(shí)通信和數(shù)據(jù)同步時(shí)所使用的端口,第二個(gè)端口則專門用于進(jìn)行Leader選舉過程中的投票通信予颤,在初始化時(shí)``quorumPeer`為其賦值囤官。
-
初始化投票
QuorumPeer#startLeaderElection
方法初始化投票創(chuàng)建當(dāng)前投票,優(yōu)先給自己投票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
-
創(chuàng)建選舉算法蛤虐,默認(rèn)electionType=3党饮,也就是
FastLeaderElection
// QuorumPeer#createElectionAlgorithm case 3: qcm = createCnxnManager(); // 監(jiān)聽連接 QuorumCnxManager.Listener listener = qcm.listener; if(listener != null) { listener.start(); le = new FastLeaderElection(this, qcm); }
創(chuàng)建Leader選舉所需的網(wǎng)絡(luò)IO層QuorumCnxManager,同時(shí)啟動(dòng)對(duì)Leader選舉端口的監(jiān)聽驳庭,等待集群中其他服務(wù)器創(chuàng)建連接刑顺。
調(diào)用start
方法啟動(dòng)線程也颤,進(jìn)入run
方法
-
注冊(cè)JMX服務(wù)
jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); ...
-
檢測(cè)當(dāng)前服務(wù)器狀態(tài)蜈漓,并根據(jù)當(dāng)前狀態(tài)做處理
switch (getPeerState()) { case LOOKING: ... case OBSERVING: ... case FOLLOWING: ... case LEADING: ... }
集群?jiǎn)?dòng)狀態(tài)當(dāng)然是LOOKING
private ServerState state = ServerState.LOOKING;
LOOKING狀態(tài)的機(jī)器需要去獲取集群的Leader,如果當(dāng)前沒有Leader或详,則進(jìn)入選舉模式贝淤。
setCurrentVote(makeLEStrategy().lookForLeader());
Leader選舉
選舉算法以默認(rèn)的FastLeaderElection#lookForLeader
為例柒竞,該方法開始新一輪Leader選舉。每當(dāng)QuorumPeer將其狀態(tài)更改為L(zhǎng)OOKING時(shí)播聪,就會(huì)調(diào)用此方法朽基,并向所有其他peers發(fā)送通知。具體選舉算法單獨(dú)分析离陶。完成選舉后服務(wù)器狀態(tài)為:
OBSERVING
稼虎、FOLLOWING
、LEADING
招刨,對(duì)應(yīng)角色分別是Observer
霎俩、Follower
、Leader
沉眶,Observer
與Follower
的區(qū)別在于Observer
不參與任何投票打却。
角色交互
完成集群選舉后,集群Leader和Followers之間需要進(jìn)行數(shù)據(jù)同步沦寂,并在后續(xù)的消息處理中学密,F(xiàn)ollowers會(huì)將事物請(qǐng)求以Request的形式轉(zhuǎn)發(fā)給Leader。
Follower
當(dāng)節(jié)點(diǎn)中狀態(tài)為FOLLOWING
時(shí)传藏,將設(shè)置當(dāng)前角色為Follower
腻暮,包括創(chuàng)建Follower
并啟動(dòng)
setFollower(makeFollower(logFactory));
follower.followLeader();
Follower#followLeader
方法
void followLeader() throws InterruptedException {
...
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
// leader zxid比自己的zxid還要小,出錯(cuò)了
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("");
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
...
}
步驟
-
找到當(dāng)前l(fā)eader毯侦,通過投票查找
Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); leaderServer = s; break; } }
-
連接到leader哭靖,重試連接上一步找到的leader
sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); for (int tries = 0; tries < 5; tries++) { sock.connect(addr, self.tickTime * self.syncLimit); }
向leader注冊(cè),
這一步Follower向Leader同步投票的Epoch以及Follower的自己的最新事務(wù)id侈离、Epoch试幽,并接受Leader的Epoch。-
同步數(shù)據(jù)
上一步Leader收到Follower最新的zxid后卦碾,根據(jù)自己的zxid決定采用哪種方式同步數(shù)據(jù)铺坞。在Learner#syncWithLeader
方法中起宽,Leader通知Follower以何種方式進(jìn)行同步readPacket(qp); if (qp.getType() == Leader.DIFF) { // 差異化同步 snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { // 全量同步 zk.getZKDatabase().clear(); zk.getZKDatabase().deserializeSnapshot(leaderIs); String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got " + signature); throw new IOException("Missing signature"); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else if (qp.getType() == Leader.TRUNC) { //截?cái)嗳罩? boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { System.exit(13); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { System.exit(13); }
Follower根據(jù)同步類型,處理本地日志文件及本地?cái)?shù)據(jù)庫(kù)
- DIFF:差異化同步
- SNAP:全量同步
- TRUNC:截?cái)嗳罩?/li>
然后Leader開始發(fā)送數(shù)據(jù)同步
// 數(shù)據(jù)同步知道接收到UPTODATE類型的數(shù)據(jù)包結(jié)束 outerLoop: while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { // 投票 case Leader.PROPOSAL: PacketInFlight pif = new PacketInFlight(); pif.hdr = new TxnHeader(); pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr); if (pif.hdr.getZxid() != lastQueued + 1) { } lastQueued = pif.hdr.getZxid(); packetsNotCommitted.add(pif); break; // 提交 case Leader.COMMIT: if (!writeToTxnLog) { pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); } else { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } } else { packetsCommitted.add(qp.getZxid()); } break; // 只有observer才能得到這種類型的包济榨。我們將此視為接收PROPOSAL和COMMIT坯沪。 case Leader.INFORM: PacketInFlight packet = new PacketInFlight(); packet.hdr = new TxnHeader(); packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr); // Log warning message if txn comes out-of-order if (packet.hdr.getZxid() != lastQueued + 1) { } lastQueued = packet.hdr.getZxid(); if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; // 同步完成 case Leader.UPTODATE: if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.cnxnFactory.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery File updating = new File(self.getTxnFactory().getSnapDir(), QuorumPeer.UPDATING_EPOCH_FILENAME); if (!updating.exists() && !updating.createNewFile()) { throw new IOException("Failed to create " + updating.toString()); } if (snapshotNeeded) { zk.takeSnapshot(); } self.setCurrentEpoch(newEpoch); if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } //需要將數(shù)據(jù)寫入事務(wù)日志 writeToTxnLog = true; isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } }
同步完成后
發(fā)送響應(yīng)
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true);
開始接收客戶端請(qǐng)求,這個(gè)zk在不同角色的節(jié)點(diǎn)上是不同的角色擒滑,
FollowerZooKeeperServer
腐晾、ObserverZooKeeperServer
zk.startup();
還需要補(bǔ)充內(nèi)存數(shù)據(jù)庫(kù)中snapshot與log之間的差異
-
不斷與
Leader
通信,同步數(shù)據(jù)while (this.isRunning()) { readPacket(qp); processPacket(qp); }
Follower#processPacket
方法檢查在qp中接收的數(shù)據(jù)包丐一,并根據(jù)其內(nèi)容進(jìn)行分發(fā)藻糖。protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { // 心跳 case Leader.PING: ping(qp); break; // 事務(wù)投票 case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; // 提交事物 case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; // 通知Learner服務(wù)器已經(jīng)完成了Sync操作 case Leader.SYNC: fzk.sync(); break; default: LOG.error("Invalid packet type: {} received by Observer", qp.getType()); } }
Follower后續(xù)還需要不斷與Leader通信,進(jìn)行事務(wù)投票库车。
至此Follower開始對(duì)外提供服務(wù)巨柒。
Leader
與Follower
類似,
setLeader(makeLeader(logFactory));
leader.lead();
QuorumPeer#makeLeader
方法凝颇,
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
return new Leader(this, new LeaderZooKeeperServer(logFactory,
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
Leader內(nèi)部處理請(qǐng)求的是LeaderZooKeeperServer
Leader#lead
的主要流程
-
加載內(nèi)存數(shù)據(jù)庫(kù)
zk.loadData();
-
創(chuàng)建
LearnerCnxAcceptor
潘拱,啟動(dòng)等待來自新followers的連接請(qǐng)求的線程。cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start();
Leader.LearnerCnxAcceptor#run
方法中Socket s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream(s.getInputStream()); // 為每個(gè)Learner創(chuàng)建一條線程拧略,處理投票芦岂、數(shù)據(jù)同步 LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start();
-
等待Leaner響應(yīng)Ack
readyToStart = true; long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized(this){ lastProposed = zk.getZxid(); } newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); waitForNewLeaderAck(self.getId(), zk.getZxid());
準(zhǔn)備完畢,只需要等待過半數(shù)的Leaner的回復(fù)即可對(duì)外工作垫蛆,在
LeanerHandler
中也會(huì)調(diào)用waitForEpochAck
禽最、waitForEpochAck
喚醒Leader -
對(duì)外提供服務(wù)
startZkServer();
心跳,和Leaner备し梗活
至此ZooKeeper集群模式啟動(dòng)完畢川无,整個(gè)集群開始對(duì)外提供服務(wù)。