1. ZooKeeper session建立及超時(shí)機(jī)制 概述
首先說一下, 為什么要寫下這篇, 原因也很簡單, 因?yàn)閟ession的建立及超時(shí)機(jī)制特別
1. ZooKeeper 集群的所有 sessionImpl 都在 Leader端, 而Follower端只將 sessionId 與 timeout 存儲到 HashMap里面
2. 在 Leader 端 每個(gè) LearnerHandler 會定期的向Follower/Observer發(fā)送給ping 包, Follower/Observer在接受到之后, 將會將對應(yīng)的要檢查超時(shí)的 sessionId 發(fā)給 Leader, 統(tǒng)一讓Leader進(jìn)行檢查
3. Leader用SessionTrackerImpl線程來檢查Session是否超時(shí), 而 session 將放在一個(gè)以 expirationTime為Key的HashMap里面, 定時(shí)的獲取并檢查, 超時(shí)的話就進(jìn)行刪除, 不超時(shí)的話將session移到下一個(gè)將要超時(shí)的 Bucket 里面(見touchSession)
接下來就直接上代碼(我們這里從Follower的角度出發(fā))
1. Client 連接 Follower
當(dāng)Client連接Follower時(shí), 會調(diào)用 FollowerZooKeeperServer.processPacket 來進(jìn)行處理(這里不涉及Zookeeper自己的NIO/NettyNIO處理部分), 最后會直接調(diào)用 LeaderZooKeeperServer.submitRequest方法將對應(yīng)的Request進(jìn)行提交, 到這里有必要說一下 Follower的RequestProcessor處理鏈
/**
* Follower 的 RequestProcessor 處理鏈 (2條)
* 第一條 鏈
* FollowerRequestProcessor: 區(qū)分處理 Request, 將 Request 交由下個(gè) RequestProcessor, 而若涉及事務(wù)的操作, 則 交由 Follower 提交給 leader (zks.getFollower().request())
* CommitProcessor: 這條鏈決定這著 Request 能否提交, 里面主要有兩條鏈 , queuedRequests : 存儲著 等待 ACK 過半確認(rèn)的 Request, committedRequests :存儲著 已經(jīng)經(jīng)過 ACK 過半確認(rèn)的 Request
* FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面(作為 Follower 一定會在 FollowerZooKeeperServer.logRequest 進(jìn)行同步Request 數(shù)據(jù)到磁盤里面后再到 FinalRequestProcessor)
*
* 第二條 鏈
* SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
* AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復(fù), 對 在Leader中就是完成 leader 自己提交 Request, 自己回復(fù) ACK
*
* 1. FollowerRequestProcessor --> CommitProcessor --> FinalRequestProcessor
* 2. SyncRequestProcessor --> SendAckRequestProcessor
*/
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true);
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
Leader 的RequestProcessor處理鏈
/**
* Leader 的 RequestProcessor 處理鏈
*
* 第一條 RequestProcessor 鏈
* PreRequestProcessor : 創(chuàng)建和修改 TxnRequest
* ProposalRequestProcessor : 提交 Proposal 給各個(gè) Follower 包括 Leader自己 (Leader自己是在 ProposalRequestProcessor 里面通過 syncProcessor.processRequest(request) 直接提交 Proposal)
* CommitProcessor : 將 經(jīng)過集群中的過半 Proposal 提交(提交的操作直接在 Leader.processAck -> zk.commitProcessor.commit(p.request))
* ToBeAppliedRequestProcessor: 這個(gè)處理鏈其實(shí)是 Request 處理時(shí)經(jīng)過的最后一個(gè) RequestProcessor, 其中最令人困惑的是 toBeApplied, 而 toBeApplied 中其實(shí)維護(hù)的是 集群中經(jīng)過 過半 ACK 同意的 proposal, 只有經(jīng)過 FinalRequestProcessor 處理過的 Request 才會在 toBeApplied 中進(jìn)行刪除
* FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面
*
* 第二條 RequestProcessor 鏈
* 在 leader 中, SynRequestProcessor, AckRequestProcessor 的創(chuàng)建其實(shí)是在 ProposalRequestProcessor 中完成的
* SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
* AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復(fù), 對 在Leader中就是完成 leader 自己提交 Request, 自己回復(fù) ACK
*
* PrepRequestProcessor --> ProposalRequestProcessor --> CommitProcessor --> ToBeAppliedRequestProcessor --> FinalRequestProcessor
* \
* SynRequestProcessor --> AckRequestProcessor (這條分支是在 ProposalRequestProcessor 里面進(jìn)行構(gòu)建的)
*/
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
// 投票確認(rèn)處理器
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false);
commitProcessor.start();
// 投票發(fā)起處理器
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
// 預(yù)處理器
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
下面先來張總體的流程圖:
上面這張圖片有點(diǎn)大, 建議在 百度云 里面進(jìn)行下載預(yù)覽, 接下來我們會一步一步進(jìn)行下去PS: 吐槽一下簡書的圖片系統(tǒng), 圖片一旦大了就預(yù)覽出問題(不清晰)
整個(gè)流程涉及好幾個(gè)過程, 下面一一分析:
2. FollowerZooKeeperServer createSession
// 創(chuàng)建 session
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout); // 1. 創(chuàng)建 會話 Session, 生成 SessionImpl 放入對應(yīng)的 sessionsById, sessionsWithTimeout, sessionSets 里面, 返回 sessionid
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd); // 2. 生成一個(gè)隨機(jī)的 byte[] passwd
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId); // 3. 提交 Request 到RequestProcessor 處理鏈
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId; // 4. 返回此回話對應(yīng)的 sessionId
}
3. FinalRequestProcessor 處理請求
switch (request.type) {
case OpCode.sync: // 2. 處理同步數(shù)據(jù)
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create: // 3. 從這里 看出 path 創(chuàng)建/刪除/設(shè)置數(shù)據(jù)/設(shè)置訪問權(quán)限/創(chuàng)建,關(guān)閉session, 多個(gè)操作 -> 都 是 Follower 交給 leader 進(jìn)行處理
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
zks.getFollower().request(request); // 4. 將事務(wù)類的請求都交給 Leader 處理
break;
}
follower提交 Request 到Leader
/**
* send a request packet to the leader
*
* @param request
* the request from the client
* @throws IOException
*/
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); // 1. 將要發(fā)送給 Leader 的數(shù)據(jù)包序列化
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close(); // 2. 封裝請求數(shù)據(jù)包
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
writePacket(qp, true); // 3. 將 事務(wù)請求 request 發(fā)送給 Leader
}
FinalRequestProcessor處理了Request后一方面是follower提交給Leader, 另一方面是提交給 CommitProcessor
4. CommitProcessor 處理請求
@Override
public void run() {
try {
Request nextPending = null;
while (!finished) { // while loop
int len = toProcess.size();
for (int i = 0; i < len; i++) {
Request request = toProcess.get(i);
LOG.info("request:"+ request); // 1. Follower 里面就是 丟給 FinalRequestProcessor 處理
nextProcessor.processRequest(request); // 2. 將 ack 過半的 Request 丟給 ToBeAppliedRequestProcessor 來進(jìn)行處理 (Leader 中是這樣處理)
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null)// 3. 如果沒有 Commit 的請求, 則進(jìn)行wait, 直到 commit 請求的到來
&& committedRequests.size() == 0) {
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
if ((queuedRequests.size() == 0 || nextPending != null)// 4. 當(dāng) Leader 通過了 過半ACK確認(rèn)后, 則會將這個(gè) Request 丟給 Follower 來處理, Follower 會直接將 Request 丟到 committedRequests 里面, 進(jìn)而處理
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null // 5. 這里其實(shí)就是比較 nextPending 與 committedRequests 中的 request 請求
&& nextPending.sessionId == r.sessionId // 6. 而 nextPending 又是從 queuedRequests 里面拿出來的, 若相同, 則直接用 committedRequests 里面的 消息頭, 消息體, zxid
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending); // 7. 將 請求 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理
nextPending = null;
} else { // 8. Leader 直接 調(diào)用 commit 方法提交的 請求, 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理 (這個(gè) IF 判斷中是 Leader 中處理的)
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request; // 9. 若請求是事務(wù)請求, 則將 follower 自己提交的 request 賦值給 nextPending
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default: // 10.這里直接加入到 隊(duì)列 toProcess 中的其實(shí)是 非 事務(wù)的請求 (比如getData), 丟到 toProcess 里面的請求會丟到下個(gè) RequestProcessor
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
在CommitProcessor 里面有幾個(gè)特別的隊(duì)列
/**
* Requests that we are holding until the commit comes in.
*/
// 等待 ACK 確認(rèn)的 Request
LinkedList<Request> queuedRequests = new LinkedList<Request>();
/**
* Requests that have been committed.
*/
// 已經(jīng) Proposal ACK 過半確認(rèn)過的 Request, 一般的要么是 Leader 自己 commit, 要么就是 Follower 接收到 Leader 的 commit 消息
LinkedList<Request> committedRequests = new LinkedList<Request>();
// 等待被 nextProcessor 處理的隊(duì)列, 其里面的數(shù)據(jù)是從 committedRequests, queuedRequests 里面獲取來的
ArrayList<Request> toProcess = new ArrayList<Request>();
5. LearnerHandler 處理Request請求
此時(shí)LearnerHandler在while loop里面處理對應(yīng)的Request請求
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet"); // 47. 這里其實(shí)就是不斷的從數(shù)據(jù)流(來源于 Follower 的) 讀取數(shù)據(jù)
LOG.info("qp:" + qp);
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
LOG.info("tickOfNextAckDeadline :" + tickOfNextAckDeadline);
ByteBuffer bb;
long sessionId;
int cxid;
int type;
LOG.info("qp.getType() : " + qp);
switch (qp.getType()) {
case Leader.ACK: // 48. 處理 Follower 回復(fù)給 Leader 的ACK 包看看之前的投票是否結(jié)束 ( 這里是 Follower 在處理 UPTODATE 后恢復(fù) ACK)
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
LOG.info("syncLimitCheck.updateAck(qp.getZxid()):" + qp.getZxid());
syncLimitCheck.updateAck(qp.getZxid());
LOG.info("this.sid:" + this.sid + ", qp.getZxid():" + qp.getZxid() + ", sock.getLocalSocketAddress():" + sock.getLocalSocketAddress());
// 49. ack 包處理成功, 如果 follower 數(shù)據(jù)同步成功, 則將它添加到 NEWLEADER 這個(gè)投票的結(jié)果中
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING: // 50. ping 數(shù)據(jù)包, 更新 session 的超時(shí)時(shí)間
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
LOG.info("leader.zk.touch: sess" + sess + ", to:"+to);
leader.zk.touch(sess, to);
}
break;
case Leader.REVALIDATE: // 51. 檢查 session 是否還存活
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
LOG.info("id:"+id + ", to:" + to);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
boolean valid = leader.zk.touch(id, to);
LOG.info("id:" + id + ", to:" + to + ", valid:" + valid);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp); // 52. 將數(shù)據(jù)包返回給對應(yīng)的 follower
break;
case Leader.REQUEST: // 53. REQUEST 數(shù)據(jù)包, follower 會將事務(wù)請求轉(zhuǎn)發(fā)給 leader 進(jìn)行處理
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice(); // 54. 讀取事務(wù)信息
Request si;
LOG.info(" sessionId:" + sessionId + ", cxid:" + cxid + ", type:" + type);
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
LOG.info("si:" + si);
leader.zk.submitRequest(si); // 55. 將事務(wù)請求的信息交由 Leader 的 RequestProcessor 處理
break;
default:
}
}
LearnerHandler調(diào)用Leader.zk.submitRequest(Request request) 到RequestProcessor處理鏈里面;
6. PrepRequestProcessor 處理請求
case OpCode.createSession: // 創(chuàng)建 session
request.request.rewind();
int to = request.request.getInt();
request.txn = new CreateSessionTxn(to); // 組裝事務(wù)體, 事務(wù)頭在最前面已經(jīng)弄好了
request.request.rewind();
zks.sessionTracker.addSession(request.sessionId, to); // 調(diào)用 sessionTracker.addSession() 將follower里的session加入到Leader的sessionsWithTimeout里面
zks.setOwner(request.sessionId, request.getOwner());
break;
這里的操作就是將session加入到Leader的sessionsById里面
7. ProposalRequestProcessor 處理請求
nextProcessor.processRequest(request); // 1. 這里的 nextProcessor 其實(shí)就是 CommitProcessor
if (request.hdr != null) { // 2. 若是 事務(wù)請求
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request); // 3. Leader 進(jìn)行 Request 的投票 (Proposal) 將 request 發(fā)送給 Follower
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request); // 4. 將 request 交給 syncProcessor 進(jìn)行落磁盤
}
這里就這幾步:
1. 提交請求到CommitProcessor.queuedRequests里面
2. 通過zks.getLeader().propose(request) 向各個(gè)Follower提交 Leader.PROPOSAL
3. 本機(jī)的 syncProcessor處理請求(持久化, 接下來就是本機(jī)的 AckRequestProcessor回復(fù)ack給 Leader.processAck 阻塞這里, ACK過半了就不會阻塞)
8. Follower.processPacket 處理請求
接著就是 Follower處理Leader提出的Proposal
case Leader.PROPOSAL: // 1. 處理 Leader 發(fā)來的 Proposal 包, 投票處理
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);// 2. 反序列化出 Request
if (hdr.getZxid() != lastQueued + 1) { // 3. 這里說明什么呢, 說明 Follower 可能少掉了 Proposal
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn); // 4. 將 Request 交給 FollowerZooKeeperServer 來進(jìn)行處理
fzk.logRequest 提交Request到syncProcessor里面, 而后就是通過SendAckRequestProcessor向Leader發(fā)送剛才Proposal對應(yīng)的ack
9. Leader.processAck 處理Follower發(fā)來的ack
/**
* 參考資料
* http://blog.csdn.net/vinowan/article/details/22196707
*
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid the zxid of the proposal sent out
*/
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
LOG.info("sid:" + sid + ", zxid:" + zxid + ", followerAddr:" + followerAddr);
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}",
Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}
LOG.info("(zxid & 0xffffffffL) == 0 :" + ((zxid & 0xffffffffL) == 0));
if ((zxid & 0xffffffffL) == 0) { // 1. zxid 全是 0
/*
* We no longer process NEWLEADER ack by this method. However,
* the learner sends ack back to the leader after it gets UPTODATE
* so we just ignore the message.
*/
return;
}
LOG.info("outstandingProposals :" + outstandingProposals);
if (outstandingProposals.size() == 0) { // 2. 沒有要回應(yīng) ack 的 Proposal 存在
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
LOG.info("lastCommitted :" + lastCommitted + ", zxid:" + zxid);
if (lastCommitted >= zxid) { // 3. Leader 端處理的 lastCommited >= zxid, 說明 zxid 對應(yīng)的 proposal 已經(jīng)處理過了
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid); // 4. 從投票箱 outstandingProposals 獲取 zxid 對應(yīng)的 Proposal
LOG.info("p:" + p);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
}
LOG.info("p:" + p + ", sid:" + sid);
p.ackSet.add(sid); // 5. 將 follower 的 myid 加入結(jié)果列表
if (LOG.isDebugEnabled()) {
LOG.info("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size());
}
LOG.info("self.getQuorumVerifier().containsQuorum(p.ackSet):" + self.getQuorumVerifier().containsQuorum(p.ackSet));
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ // 6. 判斷是否票數(shù)夠了, 則啟動 leader 的 CommitProcessor 來進(jìn)行處理
LOG.info("zxid:" + zxid + ", lastCommitted:" + lastCommitted);
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
LOG.info("outstandingProposals:" + outstandingProposals);
outstandingProposals.remove(zxid); // 7. 從 outstandingProposals 里面刪除那個(gè)可以提交的 Proposal
if (p.request != null) {
toBeApplied.add(p); // 8. 加入到 toBeApplied 隊(duì)列里面, 這里的 toBeApplied 是 ToBeAppliedRequestProcessor, Leader 共用的隊(duì)列, 在經(jīng)過 CommitProcessor 處理過后, 就到 ToBeAppliedRequestProcessor 里面進(jìn)行處理
LOG.info("toBeApplied:" + toBeApplied); // 9. toBeApplied 對應(yīng)的刪除操作在 ToBeAppliedRequestProcessor 里面, 在進(jìn)行刪除時(shí), 其實(shí)已經(jīng)經(jīng)過 FinalRequestProcessor 處理過的
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid); // 10. 向 集群中的 Followers 發(fā)送 commit 消息, 來通知大家, zxid 對應(yīng)的 Proposal 可以 commit 了
inform(p); // 11. 向 集群中的 Observers 發(fā)送 commit 消息, 來通知大家, zxid 對應(yīng)的 Proposal 可以 commit 了
zk.commitProcessor.commit(p.request); // 12. 自己進(jìn)行 proposal 的提交 (直接調(diào)用 commitProcessor 進(jìn)行提交 )
// 13. 其實(shí)這里隱藏一個(gè)細(xì)節(jié), 就是有可能 有些 Proposal 在 Follower 上進(jìn)行了 commit, 而 Leader 上還沒來得及提交, 就有可能與集群間的其他節(jié)點(diǎn)斷開連接
LOG.info("pendingSyncs :" + pendingSyncs);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
這里處理ACK時(shí), 若已經(jīng)收到集群中過半的ack則就可以向集群中的其他節(jié)點(diǎn)發(fā)送commit, 或inform其他Observer節(jié)點(diǎn), 然后 zk.commitProcessor.commit(p.request) 提交request到Leader的commitProcessor.committedRequests里面, 最后就是 先在FinalRequestProcessor處理一下, 再在ToBeAppliedRequestProcessor.toBeApplied刪除request
10. FollowerZooKeeperServer.commit(long zxid) 提交Proposal
/**
* When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
* the pendingTxns queue and hands it to the commitProcessor to commit.
* @param zxid - must correspond to the head of pendingTxns if it exists
*/
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid; // 1. http://blog.csdn.net/fei33423/article/details/53749138
if (firstElementZxid != zxid) { // 2. 這里就有經(jīng)典問題, 在 Leader 端提交了 3 個(gè) Proposal 的信息(comit 1, comit 2, comit 3), 但 follower 在接收到 comit 1 后就接收到 comit 3
LOG.error("Committing zxid 0x" + Long.toHexString(zxid) // 3. 則就會打印這里的日志, 并且進(jìn)行退出
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request); // 4. 提交到 commitProcessor.committedRequests 里面
}
而后就是Follower.FinalRequestProcessor進(jìn)行最終的響應(yīng)客戶端處理
11. Session超時(shí)機(jī)制 Leader.ping()
在Leader上有個(gè)while loop會遍歷 LearnerHandler 然后發(fā)送 ping請求給 Follower/Observer
/**
* ping calls from the leader to the peers
*/
// 這里其實(shí)是 Leader 向 Follower 發(fā)送 ping 請求
// 在向 Learner 發(fā)送ping消息之前, 首先會通過 syncLimitCheck 來檢查
public void ping() {
long id;
if (syncLimitCheck.check(System.nanoTime())) {
synchronized(leader) {
id = leader.lastProposed;
}
QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
queuePacket(ping);
} else {
LOG.warn("Closing connection to peer due to transaction timeout.");
shutdown();
}
}
12. Follower處理Leader發(fā)來的ping請求
Follower在接到Leader的ping請求后會將sessionId及timeout發(fā)送給Leader, 進(jìn)行超時(shí)機(jī)制檢查
// Follower 將自己的 sessionId 及超時(shí)時(shí)間發(fā)送給 Leader, 讓 Leader 進(jìn)行 touch 操作, 校驗(yàn)是否 session 超時(shí)
protected void ping(QuorumPacket qp) throws IOException {
// Send back the ping with our session data
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
HashMap<Long, Integer> touchTable = zk // 1. 獲取 Follower/Observer 的 touchTable(sessionId <-> sessionTimeout) 發(fā)給 Leader 進(jìn)行session超時(shí)的檢測
.getTouchSnapshot();
for (Entry<Long, Integer> entry : touchTable.entrySet()) {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
qp.setData(bos.toByteArray()); // 2. 轉(zhuǎn)化成字節(jié)數(shù)組, 進(jìn)行數(shù)據(jù)的寫入
writePacket(qp, true); // 3. 發(fā)送數(shù)據(jù)包
}
13. Leader處理Follower發(fā)來的sessionId及timeout
Leader在接收到Follower發(fā)來的sessionId及timeout, 將會調(diào)用SessionTrackerImpl.touchSession(long sessionId, int timeout)來進(jìn)行校驗(yàn)
// 更新 session 的過期時(shí)間
synchronized public boolean touchSession(long sessionId, int timeout) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
SessionImpl s = sessionsById.get(sessionId); // 1. 從 sessionsById 獲取 session, sessionsById 是一個(gè) SessionId <-> SessionImpl 的 map
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
} // 2. 計(jì)算過期時(shí)間
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
SessionSet set = sessionSets.get(s.tickTime); // 3. 這里的 SessionSet 就是一個(gè) timeout 對應(yīng)額 Bucket, 將有一個(gè)線程, 在超時(shí)時(shí)間點(diǎn)檢查這個(gè) SessionSet
if (set != null) {
set.sessions.remove(s);
}
s.tickTime = expireTime; // 4. 下面的步驟就是將 session 以 tickTime 為單位放入 sessionSets 中
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s); // 5. 將 SessionImpl 放入對應(yīng)的 SessionSets 里面
return true;
}
總結(jié)
zookeeper的session機(jī)制只適用于有少量client連接Server的場景(zookeeper的默認(rèn)maxClientCnxns 是60, 超過的話就會socket主動關(guān)閉), 當(dāng)有百萬連接時(shí), 用這種session集中, 用一條線程檢測超時(shí)的機(jī)制可能在性能上出現(xiàn)問題, 當(dāng)zookeeper還是給出了一種很好的思考方向! 在理解了session創(chuàng)建機(jī)制后, 對應(yīng)的create/setData/delete就很好理解了!