概述
前面已經(jīng)分析單機(jī)模式事務(wù)請(qǐng)求流程的處理功戚,責(zé)任鏈為PrepRequestProcessor ~ SyncRequestProcessor ~ FinalRequestProcessor流程還是比較清晰的;本節(jié)來分析下集群模式下事務(wù)請(qǐng)求的處理,其中涉及zab協(xié)議的實(shí)現(xiàn)鞋邑、leader和follower之間的通信、Request在各個(gè)Processor之間的流轉(zhuǎn)等等会钝;
流程分析
0. 準(zhǔn)備責(zé)任鏈
- FollowerZooKeeperServer.setupRequestProcessors和LeaderZooKeeperServer.setupRequestProcessors中進(jìn)行設(shè)置溜嗜;
下面按照該流程圖進(jìn)行分析(流程圖中序號(hào)跟下面分析序號(hào)對(duì)應(yīng))
1. PrepRequestProcessor.run
跟單機(jī)模式入口一致,都是監(jiān)聽2181端口巢墅,PrepRequestProcessor對(duì)請(qǐng)求進(jìn)行校驗(yàn)(例如:路徑诸狭、版本、ACL等)之后傳遞到ProposalRequestProcessor君纫;參考Zookeeper(五)-服務(wù)端單機(jī)模式-事務(wù)請(qǐng)求處理
2. ProposalRequestProcessor.processRequest
public void processRequest(Request request) throws RequestProcessorException {
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
// CommitProcessor
nextProcessor.processRequest(request);
// 事務(wù)請(qǐng)求驯遇,hdr在Request構(gòu)造方法中沒有使用,在PrepRequestProcessor.pRequest2Txn處理事務(wù)請(qǐng)求時(shí)賦值
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
// 廣播follower提案
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 記錄txn / snapshot日志蓄髓,然后next到 AckRequestProcessor
syncProcessor.processRequest(request);
}
}
}
-
nextProcessor.processRequest(request)
Request傳遞到CommitProcessor叉庐; -
request.hdr != null
hdr在Request構(gòu)造方法中沒有使用,在PrepRequestProcessor.pRequest2Txn處理事務(wù)請(qǐng)求時(shí)賦值会喝,因此不為空表示事務(wù)請(qǐng)求陡叠; -
zks.getLeader().propose(request)
構(gòu)造Propose廣播給follower; -
syncProcessor.processRequest(request)
記錄txn / snapshot日志肢执,然后next到 AckRequestProcessor枉阵;
3.1. CommitProcessor.run
public void run() {
try {
Request nextPending = null;
while (!finished) {
// toProcess初始為空,
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// ToBeApplied
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) {
// committedRequests為空時(shí)阻塞
wait();
continue;
}
// First check and see if the commit came in for the pending request
// 初始時(shí)queuedRequests不為空预茄,nextPending為空兴溜,committedRequests不為空
// 一次遍歷后:nextPending = queuedRequests中取出的Request; r = committedRequests中取出的Request;
// leader同步到follower時(shí)queuedRequests始終為空
if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) {
Request r = committedRequests.remove();
if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) {
// we want to send our version of the request.the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 使用nextPending,因?yàn)樗言O(shè)置了cnxn
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just send the commit packet
toProcess.add(r);
}
}
}
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
-
nextProcessor.processRequest(toProcess.get(i))
toProcess(存放需要傳遞到下一個(gè)Processor的Request)不為空時(shí)遍歷并傳遞到ToBeAppliedProcessor; -
committedRequests
已經(jīng)提交的請(qǐng)求拙徽,即leader已經(jīng)收到半數(shù)follower的ACK并廣播了COMMIT的請(qǐng)求刨沦;這里為空會(huì)wait()
,在5.1
時(shí)收到過半的提案會(huì)從outstandingProposals加入到committedRequests斋攀,之后這里繼續(xù)往下執(zhí)行已卷; -
queuedRequests
是從上一個(gè)Processor(ProposalRequestProcessor)傳遞過來的請(qǐng)求鏈表;需要注意的是leader發(fā)送COMMIT到follower淳蔼,follower執(zhí)行到這時(shí)queuedRequests為空侧蘸; -
nextPending
初始為空,阻塞被喚醒之后從queuedRequests中取出鹉梨; -
nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid
表示當(dāng)前是leader在處理讳癌,不是leader同步到follower的處理;leader時(shí)需要把nextPending放入toProcess傳遞到下一個(gè)Processor存皂,因?yàn)閚extPending中已經(jīng)設(shè)置了cnxn(用于跟follower進(jìn)行通信)晌坤;follower時(shí)直接把committedRequests中Request放到toProcess;
3.2. Leader.propose(廣播提案)
public Proposal propose(Request request) throws XidRolloverException {
// zxid的低32位已經(jīng)達(dá)到最大值旦袋,需要重新選舉使高32位的epoch+1
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
// Proposal > QuorumPacket > Request
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
lastProposed = p.packet.getZxid();
// propose放入outstandingProposals
outstandingProposals.put(lastProposed, p);
// 遍歷forwardingFollowers骤菠,廣播提案
sendPacket(pp);
}
return p;
}
-
(request.zxid & 0xffffffffL) == 0xffffffffL
zxid的低32位已經(jīng)達(dá)到最大值,需要重新選舉使高32位的epoch+1 -
outstandingProposals.put(lastProposed, p)
反序列化的Request包裝成Propose放入outstandingProposals疤孕; -
sendPacket(pp)
遍歷forwardingFollowers商乎,把Propose放入每一個(gè)follower對(duì)應(yīng)的queuedPackets中;(上一節(jié)分析的LearnerHandler.sendPackets中發(fā)送)
4-1. Follower.followerLeader(接收提案)
while (this.isRunning()) {
// 啟動(dòng)后阻塞等待leader發(fā)送請(qǐng)求
readPacket(qp);
processPacket(qp);
}
上一節(jié)已經(jīng)分析過
4-2. FollowerZooKeeperServer.logRequest
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
// 放入pendingTxns祭阀,commit時(shí)使用
pendingTxns.add(request);
}
// 提交到SyncRequestProcessor -> SendAckRequestProcessor
syncProcessor.processRequest(request);
}
-
pendingTxns.add(request)
放入pendingTxns用于傳遞到CommitProcessor鹉戚,圖中5.2
; -
syncProcessor.processRequest(request)
Follower中傳遞到SyncRequestProcessor专控;
4-3. SyncRequestProcessor.run(記錄日志)
參考Zookeeper(五)-服務(wù)端單機(jī)模式-事務(wù)請(qǐng)求處理
4-4. SendAckRequestProcessor.processRequest(回復(fù)ACK)
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null, null);
try {
// 回復(fù)leader ACK
learner.writePacket(qp, false);
}
......
}
}
-
learner.writePacket(qp, false)
寫完事務(wù)日志抹凳,通過leaderOs回復(fù)ACK包給leader;
4-5. LearnerHandler.run(接收ACK)
case Leader.ACK:
......
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if ((zxid & 0xffffffffL) == 0) {
// 低32位全0伦腐,即忽略NEWLEADER-ACK
return;
}
if (outstandingProposals.size() == 0) {
return;
}
if (lastCommitted >= zxid) {
// 忽略已經(jīng)提交的zxid
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
return;
}
// 收到ack的sid放入該提案的ackSet中赢底,用于過半判斷
p.ackSet.add(sid);
// 判斷該提案是否收到過半的Follower的ack
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
// zxid正常應(yīng)該是按順序執(zhí)行的
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
// TODO 提案加入ConcurrentLinkedQueue
toBeApplied.add(p);
}
// 廣播COMMIT到follower
commit(zxid);
// 廣播INFORM到observer
inform(p);
// 已經(jīng)提交的Request放入committedRequests,等待CommitRequestProcessor處理
zk.commitProcessor.commit(p.request);
// LearnerSyncRequest時(shí)會(huì)放入pendingSyncs
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
-
(zxid & 0xffffffffL) == 0
低32位全為0柏蘑,即忽略NEWLEADER-ACK颖系; -
lastCommitted >= zxid
忽略已經(jīng)提交的提案; -
self.getQuorumVerifier().containsQuorum(p.ackSet)
判斷該提案是否收到過半的Follower的ACK辩越; -
toBeApplied.add(p)
toBeApplied在構(gòu)造ToBeAppliedRequestProcessor時(shí)傳入嘁扼,COMMIT之后用于校驗(yàn)后刪除; -
commit(zxid)
廣播COMMIT到follower黔攒,遍歷forwardingFollowers趁啸,把COMMIT放入每一個(gè)follower對(duì)應(yīng)的queuedPackets中强缘;(上一節(jié)分析的LearnerHandler.sendPackets中發(fā)送);(對(duì)應(yīng)下面5-1
) -
inform(p)
廣播INFORM到observer不傅,遍歷observingLearners旅掂,把COMMIT放入每一個(gè)observer對(duì)應(yīng)的queuedPackets中;(上一節(jié)分析的LearnerHandler.sendPackets中發(fā)送)访娶; -
zk.commitProcessor.commit(p.request)
已經(jīng)提交的Request放入committedRequests商虐,激活CommitRequestProcessor.run(即5.1
);
5.1. CommitProcessor.run(已發(fā)送COMMIT)
committedRequests.size不為0崖疤,wait結(jié)束秘车,上面3.1
中已經(jīng)分析;
5-1. Follower.followerLeader(接收COMMIT)
while (this.isRunning()) {
// 啟動(dòng)后阻塞等待leader發(fā)送請(qǐng)求
readPacket(qp);
processPacket(qp);
}
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}
-
firstElementZxid != zxid
COMMIT的zxid不等于pendingTxns頭元素劫哼,即commit順序跟propose不一致叮趴,直接退出; -
commitProcessor.commit(request)
提交到commitProcessor(5-2
)权烧;
5-2. CommitProcessor.run(Follower執(zhí)行COMMIT)
跟Leader執(zhí)行CommitProcessor.run邏輯一致眯亦,上面3.1
中已經(jīng)分析;
5-3. FinalRequestProcessor.processRequest(Follower執(zhí)行)
整體邏輯在單機(jī)模式已經(jīng)分析過般码,主要包括執(zhí)行內(nèi)存事務(wù)操作和構(gòu)造響應(yīng)兩部分妻率,但是集群模式從Leader同步過來的請(qǐng)求不需要構(gòu)造響應(yīng);
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
if (request.cnxn == null) {
return;
}
-
Request.isQuorum
事務(wù)請(qǐng)求時(shí)維護(hù)committedLog(快速選舉時(shí)用于數(shù)據(jù)同步)板祝;(但是這里是Follower為什么也要維護(hù)committedLog呢宫静?) -
request.cnxn == null
前面3.1
時(shí)分析過,為空說明該Request是從Leader傳遞過來扔字,說明當(dāng)前是Follower,不用構(gòu)造Response温技,直接return革为;
至此Follower流程就結(jié)束了
再回過頭來看4.1
4.1. SyncRequestProcessor.run(Leader記錄日志)
同4-3
,同樣參考Zookeeper(五)-服務(wù)端單機(jī)模式-事務(wù)請(qǐng)求處理
4.2+4.3. AckRequestProcessor.run(Leader本身的ACK)
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
- 處理Leader自己請(qǐng)求的ACK舵鳞,調(diào)用Leader.processAck震檩,相當(dāng)于接收自己的ACK,具體處理流程同
4-5
蜓堕;
再退一步來看5.2
5.2. ToBeAppliedRequestProcessor.processRequest
public void processRequest(Request request) throws RequestProcessorException {
// request.addRQRec(">tobe");
// 啥也沒干直接傳遞到 FinalRequestProcessor(更新DataTree抛虏,組裝客戶端響應(yīng))
next.processRequest(request);
Proposal p = toBeApplied.peek();
// 從toBeApplied中移除(Leader.processAck時(shí)放入)
if (p != null && p.request != null && p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}
關(guān)于toBeApplied,這里重點(diǎn)補(bǔ)充下
1.
Leader.processAck時(shí)套才,收到過半的ACK會(huì)把當(dāng)前Proposal放入Leader的toBeApplied成員變量中迂猴;
2.
LeaderHandler.run選主后數(shù)據(jù)同步Follower時(shí),在發(fā)送NEWLEADER之前背伴,Leader并未停止處理請(qǐng)求沸毁,在commitLog和NEWLEADER之間的請(qǐng)求就是上一步放入toBeApplied中的峰髓;數(shù)據(jù)同步時(shí)就是通過下面startForwarding方法遍歷toBeApplied構(gòu)造PROPOSE和COMMIT發(fā)送給Follower;
synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) {
// Queue up any outstanding requests enabling the receipt of new requests
if (lastProposed > lastSeenZxid) {
for (Proposal p : toBeApplied) {
if (p.packet.getZxid() <= lastSeenZxid) {
continue;
}
handler.queuePacket(p.packet);
// Since the proposal has been committed we need to send the commit message also
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
.getZxid(), null, null);
handler.queuePacket(qp);
}
// Only participant need to get outstanding proposals
if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
Collections.sort(zxids);
for (Long zxid: zxids) {
if (zxid <= lastSeenZxid) {
continue;
}
handler.queuePacket(outstandingProposals.get(zxid).packet);
}
}
}
if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
addForwardingFollower(handler);
} else {
addObserverLearnerHandler(handler);
}
return lastProposed;
}
3.
LeaderZooKeeperServer.setupRequestProcessors構(gòu)造Processor鏈時(shí)息尺,把Leader的toBeApplied傳到ToBeAppliedRequestProcessor的toBeApplied携兵;
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);
4.
這里ToBeAppliedRequestProcessor.processRequest判斷已經(jīng)收到這部分提案的ACK則從toBeApplied中刪除;
5.3. FinalRequestProcessor.processRequest(Leader執(zhí)行)
跟單機(jī)模式流程一樣搂誉,更新內(nèi)存后構(gòu)造響應(yīng)返回客戶端(5.4
)徐紧,參考Zookeeper(五)-服務(wù)端單機(jī)模式-事務(wù)請(qǐng)求處理
小結(jié)
本節(jié)只分析了Leader直接接收請(qǐng)求的流程;
1.
Follower接收事務(wù)請(qǐng)求時(shí)FollowerRequestProcessor.run中會(huì)構(gòu)造REQUEST包發(fā)送給Leader(Learner.request
)進(jìn)行處理炭懊,Leader的LearnerHandler.run中接收到REQUEST會(huì)構(gòu)造Request從Leader的firstProcessor開始進(jìn)行處理(ZooKeeperServer.submitRequest
)并级,后續(xù)流程跟本節(jié)一致;
2.
Obverser跟Follower相比請(qǐng)求處理流程都完全一致(非事務(wù)請(qǐng)求直接處理/事務(wù)請(qǐng)求轉(zhuǎn)發(fā)到給Leader處理)凛虽,唯一不同是Obverser不參與事務(wù)提交和選舉死遭,與其他節(jié)點(diǎn)的唯一交互是接收來自leader的inform消息,更新自己本地存儲(chǔ)(其作用主要是為了跨數(shù)據(jù)中心提升讀性能)凯旋。