目錄:
- 數(shù)據(jù)同步與初始化(選舉完leader之后)
- 分角色業(yè)務(wù)處理分析(leader,follower,observer)
1.數(shù)據(jù)同步與初始化
選舉完leader之后矾踱,只有當(dāng)各個角色與leader保持?jǐn)?shù)據(jù)同步袭灯,才能對外提供服務(wù)昵观。
其中惠啄,服務(wù)器間數(shù)據(jù)同步過程
基本分為三種方式:
- SNAP方式(snapshot,同步整個文件)
- DIFF方式
- TRUNC方式
1.1 基本流程
其中第3步:Follower返回Leader的AckEpoch,會包含當(dāng)前的最大zxid席镀,Leader節(jié)點會將該zxid與其minZxid诱桂,maxZxid進(jìn)行比較。
這個 [minZxid,maxZxid] 實際上是在leader端緩存的一個事務(wù)隊列淑掌。
其中第6步:發(fā)送的NewLeader蒿讥,說明當(dāng)前數(shù)據(jù)已經(jīng)同步完(Leader已經(jīng)將該同步的內(nèi)容發(fā)送給Follower)
三種方式的區(qū)別:
如果Follower端的zxid小于minZxid,說明Leader與Follower之間數(shù)據(jù)差距非常大抛腕,直接采取Snap方式芋绸,F(xiàn)ollower就去接收Leader發(fā)送的snapshot文件
如果Follower端的zxid處于minZxid,maxZxid之間担敌,采取Diff方式侥钳,即Leader只要發(fā)送區(qū)間為[zxid,maxZxid]的事務(wù)即可,F(xiàn)ollower接收到這些事務(wù)柄错,進(jìn)行持久化并更新內(nèi)存
如果Follower端的zxid大于maxZxid舷夺,采取Trunc方式,F(xiàn)ollower則將大于maxZxid的事務(wù)日志刪除
1.2 類說明
Learner類
Learner包括Follower和Observer售貌,其中比較重要的leaderIs
,leaderOs
,表示是鏈接到Leader的輸入流给猾,輸出流LearnerHandler(繼承自ZooKeeperThread)
1.3 詳細(xì)說明
在QuorumPeer中,進(jìn)行FastLeaderElection之后颂跨,即在QuorumPeer的run方法中敢伸,
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
通過getPeerState()
方法,獲取當(dāng)前服務(wù)器的state恒削,如果是FOLLOWING狀態(tài)池颈,
followLeader()方法:Follower在提供服務(wù)給客戶端之間完成注冊到Leader的動作。
注冊分為以下3個主要步驟:
- 調(diào)用connectToLeader方法連接到Leader钓丰。
- 調(diào)用registerWithLeader方法注冊到Leader躯砰,交換各自的sid、zxid和Epoch等信息携丁,Leader以此決定事務(wù)同步的方式琢歇。
- 調(diào)用SyncWithLeader跟Leader進(jìn)行事務(wù)數(shù)據(jù)同步,處理SNAP/DIFF/TRUNC包。
connectToLeader:創(chuàng)建Socket連接到Leader李茫,該方法定義在Follower父類Learner中揭保,加了重試機(jī)制,最多可以嘗試5次連接魄宏。連接成功后秸侣,Leader會創(chuàng)建一個LearnerHandler專門處理與該Follower之間的QuorumPacket消息的傳遞。
registerWithLeader:首先會發(fā)送FOLLOWERINFO包給Leader宠互,告訴Leader自己的身份屬性(Follower的zxid味榛,sid)。然后等待Leader回復(fù)的LEADINFO包名秀,獲取Leader的Epoch和zxid值,并更新Follower的Epoch和zxid值藕溅,以Leader信息為準(zhǔn)匕得。
最后,給Leader發(fā)送ACKINFO包巾表,告訴Leader這次Follower已經(jīng)與Leader的zxid同步了汁掠。SyncWithLeader:與Leader同步數(shù)據(jù),即同步Leader的事務(wù)到Follower
3.1
首先讀取同步數(shù)據(jù)包集币,主要代碼如下:
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
readPacket(qp);
// 提交的packets
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
// 未提交的packets
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
// Diff方式下,不需要進(jìn)行snapshot
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// clear our own database and read
// 清空日志,minZxid和maxZxid都為0,,新構(gòu)建DataTree
zk.getZKDatabase().clear();
zk.getZKDatabase().deserializeSnapshot(leaderIs);
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
// 設(shè)置最近的zxid
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
<1> SNAP:快照模式考阱,這種模式下Leader將整個完整數(shù)據(jù)庫傳給Follower
<2> TRUNC:截斷模式,這種模式表明Follower的數(shù)據(jù)比Leader還多鞠苟,為了維持一致性需要將Follower多余的數(shù)據(jù)刪除
<3> DIFF:差異模式乞榨,說明Follower比Leader的事務(wù)少,需要給Follower補(bǔ)足当娱,這時候Leader會將需要補(bǔ)充的事務(wù)生成PROPOSAL包和COMMIT包發(fā)給Follower執(zhí)行吃既。
3.2
處理后續(xù)消息(即QuorumPacket類型)
比如Proposal,Commit跨细,NewLeader等鹦倚,其中Proposal是指在同步期間收到的Leader發(fā)送的寫請求信息,緩存在packetsNotCommitted
中冀惭。
outerLoop:
while (self.isRunning()) {
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
if (!writeToTxnLog) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
/*
* Only observer get this type of packet. We treat this
* as receiving PROPOSAL and COMMMIT.
*/
PacketInFlight packet = new PacketInFlight();
packet.hdr = new TxnHeader();
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
File updating = new File(self.getTxnFactory().getSnapDir(),
QuorumPeer.UPDATING_EPOCH_FILENAME);
if (!updating.exists() && !updating.createNewFile()) {
throw new IOException("Failed to create " +
updating.toString());
}
if (snapshotNeeded) {
zk.takeSnapshot();
}
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
writeToTxnLog = true;
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
之后震叙,Leader會發(fā)送NEWLEAER包,F(xiàn)ollower收到NEWLEADER包后回復(fù)ACK給Leader散休,
最后媒楼,Leader發(fā)送UPTODATE包表示同步完成,F(xiàn)ollower這時啟動服務(wù)端并跳出本次循環(huán)戚丸,準(zhǔn)備結(jié)束整個注冊過程匣砖。
3.3 Follower主流程
Follower是Learner的子類,F(xiàn)ollower的啟動方法就是followLeader。
// 尋找Leader角色
QuorumServer leaderServer = findLeader();
try {
// 嘗試5次連接Leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 建立Following連接,
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
// 進(jìn)行數(shù)據(jù)同步
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
啟動時猴鲫,首先與Leader同步數(shù)據(jù)对人,然后啟動FollowerZooKeeperServer,在FollowerZooKeeperServer運行的同時拂共,額外啟動while循環(huán)等待QuorumPacket包牺弄,調(diào)用processPacket方法處理這些包。
processPacket處理QuorumPeer傳送的
QuorumPacket宜狐,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT势告。當(dāng)然還有PING、COMMITANDACTIVATE等包類型抚恒。
該方法在收到Leader發(fā)送過來的QuorumPacket時被調(diào)用咱台,主要是響應(yīng)PROPOSAL和COMMIT兩種類型的消息。PROPOSAL是Leader將要執(zhí)行的寫事務(wù)命令俭驮;COMMIT是提交命令回溺。Follower只有在收到COMMIT消息后才會讓PROPOSAL命令的內(nèi)容生效。
同一個寫事務(wù)命令會在Leader和多個Follower上都執(zhí)行一次混萝,保證集群數(shù)據(jù)的一致性遗遵。
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
Follower收到PROPOSAL消息后調(diào)用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息后調(diào)用FollowerZooKeeperServer的commit方法逸嘀。
PROPOSAL包
Leader發(fā)送給集群中所有follower的寫請求包车要。
Leader執(zhí)行寫操作時需要告之集群中的Learner,讓大家也執(zhí)行寫操作崭倘,保證集群數(shù)據(jù)的一致性翼岁。PROPOSAL是嚴(yán)格按照順序執(zhí)行的,這也是ZOOKEEPER的核心設(shè)計思想之一COMMIT包
當(dāng)Leader認(rèn)為一個Proposal已被大多數(shù)Follower持久化并等待執(zhí)行后會發(fā)送COMMIT包司光,通知各Follower可以提交執(zhí)行該Proposal了登澜,最后調(diào)用到FinalRequestProcessor執(zhí)行寫操作,通過這種機(jī)制保證寫操作能被大半數(shù)集群機(jī)器執(zhí)行
3.4 Observer主流程
Observer和Follower功能類似飘庄,主要的差別就是不參與選舉脑蠕。
Observer的入口方法是observerLeader。當(dāng)QuorumPeer的狀態(tài)是OBSERVING時會啟動Observer并調(diào)用observerLeader方法跪削。
observerLeader同F(xiàn)ollower的followLeader方法類似谴仙,首先注冊到Leader,事務(wù)同步后進(jìn)入QuorumPacket包循環(huán)處理過程碾盐,調(diào)用processPacket方法處理QuorumPacket晃跺。
processPacket比Follower要簡單許多,最主要是處理INFORM包來執(zhí)行Leader的寫請求命令毫玖。
這里處理的是INFORM消息掀虎,Leader群發(fā)寫事務(wù)時凌盯,給Follower發(fā)的是PROPOSAL并要等待Follower確認(rèn);而給Observer發(fā)的則是INFORM消息并且不需要Obverver回復(fù)ACK消息來確認(rèn)烹玉。