ZooKeeper 源碼分析 session建立及超時(shí)機(jī)制 (基于3.4.6)

1. ZooKeeper session建立及超時(shí)機(jī)制 概述

首先說一下, 為什么要寫下這篇, 原因也很簡單, 因?yàn)閟ession的建立及超時(shí)機(jī)制特別

1. ZooKeeper 集群的所有 sessionImpl 都在 Leader端, 而Follower端只將 sessionId 與 timeout 存儲到 HashMap里面
2. 在 Leader 端 每個(gè) LearnerHandler 會定期的向Follower/Observer發(fā)送給ping 包, Follower/Observer在接受到之后, 將會將對應(yīng)的要檢查超時(shí)的 sessionId 發(fā)給 Leader, 統(tǒng)一讓Leader進(jìn)行檢查
3. Leader用SessionTrackerImpl線程來檢查Session是否超時(shí), 而 session 將放在一個(gè)以 expirationTime為Key的HashMap里面, 定時(shí)的獲取并檢查, 超時(shí)的話就進(jìn)行刪除, 不超時(shí)的話將session移到下一個(gè)將要超時(shí)的 Bucket 里面(見touchSession)

接下來就直接上代碼(我們這里從Follower的角度出發(fā))

1. Client 連接 Follower

當(dāng)Client連接Follower時(shí), 會調(diào)用 FollowerZooKeeperServer.processPacket 來進(jìn)行處理(這里不涉及Zookeeper自己的NIO/NettyNIO處理部分), 最后會直接調(diào)用 LeaderZooKeeperServer.submitRequest方法將對應(yīng)的Request進(jìn)行提交, 到這里有必要說一下 Follower的RequestProcessor處理鏈

/**
 * Follower 的 RequestProcessor 處理鏈 (2條)
 * 第一條 鏈
 * FollowerRequestProcessor: 區(qū)分處理 Request, 將 Request 交由下個(gè) RequestProcessor, 而若涉及事務(wù)的操作, 則 交由 Follower 提交給 leader (zks.getFollower().request())
 * CommitProcessor: 這條鏈決定這著 Request 能否提交, 里面主要有兩條鏈 , queuedRequests : 存儲著 等待 ACK 過半確認(rèn)的 Request, committedRequests :存儲著 已經(jīng)經(jīng)過 ACK 過半確認(rèn)的 Request
 * FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面(作為  Follower 一定會在 FollowerZooKeeperServer.logRequest 進(jìn)行同步Request 數(shù)據(jù)到磁盤里面后再到 FinalRequestProcessor)
 *
 * 第二條 鏈
 * SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
 * AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復(fù), 對 在Leader中就是完成 leader 自己提交 Request, 自己回復(fù) ACK
 *
 * 1. FollowerRequestProcessor --> CommitProcessor --> FinalRequestProcessor
 * 2. SyncRequestProcessor --> SendAckRequestProcessor
 */
@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true);
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
    syncProcessor.start();
}

Leader 的RequestProcessor處理鏈

/**
 * Leader 的 RequestProcessor 處理鏈
 *
 * 第一條 RequestProcessor 鏈
 * PreRequestProcessor : 創(chuàng)建和修改 TxnRequest
 * ProposalRequestProcessor : 提交  Proposal 給各個(gè) Follower 包括 Leader自己 (Leader自己是在 ProposalRequestProcessor 里面通過 syncProcessor.processRequest(request) 直接提交 Proposal)
 * CommitProcessor : 將 經(jīng)過集群中的過半 Proposal 提交(提交的操作直接在 Leader.processAck -> zk.commitProcessor.commit(p.request))
 * ToBeAppliedRequestProcessor: 這個(gè)處理鏈其實(shí)是 Request 處理時(shí)經(jīng)過的最后一個(gè) RequestProcessor, 其中最令人困惑的是 toBeApplied, 而 toBeApplied 中其實(shí)維護(hù)的是 集群中經(jīng)過 過半 ACK 同意的 proposal, 只有經(jīng)過 FinalRequestProcessor 處理過的 Request 才會在 toBeApplied 中進(jìn)行刪除
 * FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面
 *
 * 第二條 RequestProcessor 鏈
 * 在 leader 中, SynRequestProcessor, AckRequestProcessor 的創(chuàng)建其實(shí)是在 ProposalRequestProcessor 中完成的
 * SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
 * AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復(fù), 對 在Leader中就是完成 leader 自己提交 Request, 自己回復(fù) ACK
 *
 * PrepRequestProcessor --> ProposalRequestProcessor --> CommitProcessor --> ToBeAppliedRequestProcessor --> FinalRequestProcessor
 *                                                    \
 *                                                     SynRequestProcessor --> AckRequestProcessor (這條分支是在 ProposalRequestProcessor 里面進(jìn)行構(gòu)建的)
 */
@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
            finalProcessor, getLeader().toBeApplied);
    // 投票確認(rèn)處理器
    commitProcessor = new CommitProcessor(toBeAppliedProcessor,
            Long.toString(getServerId()), false);
    commitProcessor.start();
    // 投票發(fā)起處理器
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
            commitProcessor);
    proposalProcessor.initialize();
    // 預(yù)處理器
    firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

下面先來張總體的流程圖:

session檢查.png

上面這張圖片有點(diǎn)大, 建議在 百度云 里面進(jìn)行下載預(yù)覽, 接下來我們會一步一步進(jìn)行下去PS: 吐槽一下簡書的圖片系統(tǒng), 圖片一旦大了就預(yù)覽出問題(不清晰)

整個(gè)流程涉及好幾個(gè)過程, 下面一一分析:

2. FollowerZooKeeperServer createSession
// 創(chuàng)建 session
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    long sessionId = sessionTracker.createSession(timeout);     // 1. 創(chuàng)建 會話 Session, 生成 SessionImpl 放入對應(yīng)的 sessionsById, sessionsWithTimeout, sessionSets 里面, 返回 sessionid
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);                                        // 2. 生成一個(gè)隨機(jī)的 byte[] passwd
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);                               // 3. 提交 Request 到RequestProcessor 處理鏈
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;                                           // 4. 返回此回話對應(yīng)的 sessionId
}
3. FinalRequestProcessor 處理請求
switch (request.type) {
case OpCode.sync:                                // 2. 處理同步數(shù)據(jù)
    zks.pendingSyncs.add(request);
    zks.getFollower().request(request);
    break;
case OpCode.create:                              // 3. 從這里 看出 path 創(chuàng)建/刪除/設(shè)置數(shù)據(jù)/設(shè)置訪問權(quán)限/創(chuàng)建,關(guān)閉session, 多個(gè)操作 -> 都 是 Follower 交給 leader 進(jìn)行處理
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
    zks.getFollower().request(request);          // 4. 將事務(wù)類的請求都交給 Leader 處理
    break;
}

follower提交 Request 到Leader

/**
 * send a request packet to the leader
 *
 * @param request
 *                the request from the client
 * @throws IOException
 */
void request(Request request) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();       // 1. 將要發(fā)送給 Leader 的數(shù)據(jù)包序列化
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId);
    oa.writeInt(request.cxid);
    oa.writeInt(request.type);
    if (request.request != null) {
        request.request.rewind();
        int len = request.request.remaining();
        byte b[] = new byte[len];
        request.request.get(b);
        request.request.rewind();
        oa.write(b);
    }
    oa.close();                                                     // 2. 封裝請求數(shù)據(jù)包
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);

    writePacket(qp, true);                                         // 3. 將 事務(wù)請求 request 發(fā)送給 Leader
}

FinalRequestProcessor處理了Request后一方面是follower提交給Leader, 另一方面是提交給 CommitProcessor

4. CommitProcessor 處理請求
@Override
public void run() {
    try {
        Request nextPending = null;            
        while (!finished) {                                            // while loop
            int len = toProcess.size();
            for (int i = 0; i < len; i++) {
                Request request = toProcess.get(i);
                LOG.info("request:"+ request);                         // 1. Follower 里面就是 丟給 FinalRequestProcessor 處理
                nextProcessor.processRequest(request);                 // 2. 將 ack 過半的 Request 丟給 ToBeAppliedRequestProcessor 來進(jìn)行處理 (Leader 中是這樣處理)
            }
            toProcess.clear();
            synchronized (this) {
                if ((queuedRequests.size() == 0 || nextPending != null)// 3. 如果沒有 Commit 的請求, 則進(jìn)行wait, 直到 commit 請求的到來
                        && committedRequests.size() == 0) {
                    wait();
                    continue;
                }
                // First check and see if the commit came in for the pending
                // request
                if ((queuedRequests.size() == 0 || nextPending != null)// 4. 當(dāng) Leader 通過了 過半ACK確認(rèn)后, 則會將這個(gè) Request 丟給 Follower 來處理, Follower 會直接將 Request 丟到 committedRequests 里面, 進(jìn)而處理
                        && committedRequests.size() > 0) {
                    Request r = committedRequests.remove();
                    /*
                     * We match with nextPending so that we can move to the
                     * next request when it is committed. We also want to
                     * use nextPending because it has the cnxn member set
                     * properly.
                     */
                    if (nextPending != null                            // 5. 這里其實(shí)就是比較 nextPending 與 committedRequests 中的 request 請求
                            && nextPending.sessionId == r.sessionId    // 6. 而 nextPending 又是從 queuedRequests 里面拿出來的, 若相同, 則直接用 committedRequests 里面的 消息頭, 消息體, zxid
                            && nextPending.cxid == r.cxid) {
                        // we want to send our version of the request.
                        // the pointer to the connection in the request
                        nextPending.hdr = r.hdr;
                        nextPending.txn = r.txn;
                        nextPending.zxid = r.zxid;
                        toProcess.add(nextPending);                    // 7. 將 請求 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理
                        nextPending = null;
                    } else {                                           // 8. Leader 直接 調(diào)用 commit 方法提交的 請求, 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理 (這個(gè) IF 判斷中是 Leader 中處理的)
                        // this request came from someone else so just
                        // send the commit packet
                        toProcess.add(r);
                    }
                }
            }

            // We haven't matched the pending requests, so go back to
            // waiting
            if (nextPending != null) {
                continue;
            }

            synchronized (this) {
                // Process the next requests in the queuedRequests
                while (nextPending == null && queuedRequests.size() > 0) {
                    Request request = queuedRequests.remove();
                    switch (request.type) {
                    case OpCode.create:
                    case OpCode.delete:
                    case OpCode.setData:
                    case OpCode.multi:
                    case OpCode.setACL:
                    case OpCode.createSession:
                    case OpCode.closeSession:
                        nextPending = request;                          // 9. 若請求是事務(wù)請求, 則將 follower 自己提交的 request 賦值給 nextPending
                        break;
                    case OpCode.sync:
                        if (matchSyncs) {
                            nextPending = request;
                        } else {
                            toProcess.add(request);
                        }
                        break;
                    default:                                            // 10.這里直接加入到 隊(duì)列 toProcess 中的其實(shí)是 非 事務(wù)的請求 (比如getData), 丟到 toProcess 里面的請求會丟到下個(gè) RequestProcessor
                        toProcess.add(request);
                    }
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
        LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
}

在CommitProcessor 里面有幾個(gè)特別的隊(duì)列

/**
 * Requests that we are holding until the commit comes in.
 */
// 等待 ACK 確認(rèn)的 Request
LinkedList<Request> queuedRequests = new LinkedList<Request>();

/**
 * Requests that have been committed.
 */
// 已經(jīng) Proposal ACK 過半確認(rèn)過的 Request, 一般的要么是 Leader 自己 commit, 要么就是 Follower 接收到 Leader 的 commit 消息
LinkedList<Request> committedRequests = new LinkedList<Request>();

// 等待被 nextProcessor 處理的隊(duì)列, 其里面的數(shù)據(jù)是從 committedRequests, queuedRequests 里面獲取來的
ArrayList<Request> toProcess = new ArrayList<Request>();

5. LearnerHandler 處理Request請求

此時(shí)LearnerHandler在while loop里面處理對應(yīng)的Request請求

while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");                         // 47. 這里其實(shí)就是不斷的從數(shù)據(jù)流(來源于 Follower 的) 讀取數(shù)據(jù)
    LOG.info("qp:" + qp);

    long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
    if (qp.getType() == Leader.PING) {
        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    if (LOG.isTraceEnabled()) {
        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
    }
    tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
    LOG.info("tickOfNextAckDeadline :" + tickOfNextAckDeadline);

    ByteBuffer bb;
    long sessionId;
    int cxid;
    int type;

    LOG.info("qp.getType() : " + qp);
    switch (qp.getType()) {
    case Leader.ACK:                                   // 48. 處理 Follower 回復(fù)給 Leader 的ACK 包看看之前的投票是否結(jié)束 ( 這里是 Follower 在處理 UPTODATE 后恢復(fù) ACK)
        if (this.learnerType == LearnerType.OBSERVER) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received ACK from Observer  " + this.sid);
            }
        }
        LOG.info("syncLimitCheck.updateAck(qp.getZxid()):"  + qp.getZxid());
        syncLimitCheck.updateAck(qp.getZxid());
        LOG.info("this.sid:" + this.sid + ", qp.getZxid():" + qp.getZxid() + ", sock.getLocalSocketAddress():" + sock.getLocalSocketAddress());
                                                      // 49. ack 包處理成功, 如果 follower 數(shù)據(jù)同步成功, 則將它添加到 NEWLEADER 這個(gè)投票的結(jié)果中
        leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());        
        break;
    case Leader.PING:                                 // 50. ping 數(shù)據(jù)包, 更新 session 的超時(shí)時(shí)間
        // Process the touches
        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
        DataInputStream dis = new DataInputStream(bis);
        while (dis.available() > 0) {
            long sess = dis.readLong();
            int to = dis.readInt();
            LOG.info("leader.zk.touch: sess" + sess + ", to:"+to);
            leader.zk.touch(sess, to);
        }
        break;
    case Leader.REVALIDATE:                          // 51. 檢查 session 是否還存活
        bis = new ByteArrayInputStream(qp.getData());
        dis = new DataInputStream(bis);
        long id = dis.readLong();
        int to = dis.readInt();
        LOG.info("id:"+id + ", to:" + to);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeLong(id);
        boolean valid = leader.zk.touch(id, to);
        LOG.info("id:" + id + ", to:" + to + ", valid:" + valid);
        if (valid) {
            try {
                //set the session owner
                // as the follower that
                // owns the session
                leader.zk.setOwner(id, this);
            } catch (SessionExpiredException e) {
                LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
            }
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(id)
                                     + " is valid: "+ valid);
        }
        dos.writeBoolean(valid);
        qp.setData(bos.toByteArray());
        queuedPackets.add(qp);                         // 52. 將數(shù)據(jù)包返回給對應(yīng)的 follower
        break;
    case Leader.REQUEST:                               // 53. REQUEST 數(shù)據(jù)包, follower 會將事務(wù)請求轉(zhuǎn)發(fā)給 leader 進(jìn)行處理
        bb = ByteBuffer.wrap(qp.getData());
        sessionId = bb.getLong();
        cxid = bb.getInt();
        type = bb.getInt();
        bb = bb.slice();                               // 54. 讀取事務(wù)信息
        Request si;
        LOG.info(" sessionId:" + sessionId + ", cxid:" + cxid + ", type:" + type);
        if(type == OpCode.sync){
            si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
        } else {
            si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
        }
        si.setOwner(this);
        LOG.info("si:" + si);
        leader.zk.submitRequest(si);                   // 55. 將事務(wù)請求的信息交由 Leader 的 RequestProcessor 處理
        break;
    default:
    }
}

LearnerHandler調(diào)用Leader.zk.submitRequest(Request request) 到RequestProcessor處理鏈里面;

6. PrepRequestProcessor 處理請求
case OpCode.createSession:                                  // 創(chuàng)建 session
    request.request.rewind();
    int to = request.request.getInt();
    request.txn = new CreateSessionTxn(to);                 // 組裝事務(wù)體, 事務(wù)頭在最前面已經(jīng)弄好了
    request.request.rewind();
    zks.sessionTracker.addSession(request.sessionId, to);   // 調(diào)用 sessionTracker.addSession() 將follower里的session加入到Leader的sessionsWithTimeout里面
    zks.setOwner(request.sessionId, request.getOwner());
    break;

這里的操作就是將session加入到Leader的sessionsById里面

7. ProposalRequestProcessor 處理請求
    nextProcessor.processRequest(request);         // 1. 這里的 nextProcessor 其實(shí)就是 CommitProcessor
if (request.hdr != null) {                         // 2. 若是 事務(wù)請求
    // We need to sync and get consensus on any transactions
    try {
        zks.getLeader().propose(request);          // 3. Leader 進(jìn)行 Request 的投票 (Proposal) 將 request 發(fā)送給 Follower
    } catch (XidRolloverException e) {
        throw new RequestProcessorException(e.getMessage(), e);
    }
    syncProcessor.processRequest(request);         // 4. 將 request 交給 syncProcessor 進(jìn)行落磁盤
}

這里就這幾步:

1. 提交請求到CommitProcessor.queuedRequests里面
2. 通過zks.getLeader().propose(request) 向各個(gè)Follower提交 Leader.PROPOSAL
3. 本機(jī)的 syncProcessor處理請求(持久化, 接下來就是本機(jī)的 AckRequestProcessor回復(fù)ack給 Leader.processAck 阻塞這里, ACK過半了就不會阻塞)
8. Follower.processPacket 處理請求

接著就是 Follower處理Leader提出的Proposal

case Leader.PROPOSAL:                                             // 1. 處理 Leader 發(fā)來的 Proposal 包, 投票處理
    TxnHeader hdr = new TxnHeader();
    Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);// 2. 反序列化出 Request
    if (hdr.getZxid() != lastQueued + 1) {                        // 3. 這里說明什么呢, 說明 Follower 可能少掉了 Proposal
        LOG.warn("Got zxid 0x"
                + Long.toHexString(hdr.getZxid())
                + " expected 0x"
                + Long.toHexString(lastQueued + 1));
    }
    lastQueued = hdr.getZxid();
    fzk.logRequest(hdr, txn);                                     // 4. 將 Request 交給 FollowerZooKeeperServer 來進(jìn)行處理

fzk.logRequest 提交Request到syncProcessor里面, 而后就是通過SendAckRequestProcessor向Leader發(fā)送剛才Proposal對應(yīng)的ack

9. Leader.processAck 處理Follower發(fā)來的ack
/**
 * 參考資料
 * http://blog.csdn.net/vinowan/article/details/22196707
 *
 * Keep a count of acks that are received by the leader for a particular
 * proposal
 * 
 * @param zxid the zxid of the proposal sent out
 */
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
    LOG.info("sid:" + sid + ", zxid:" + zxid + ", followerAddr:" + followerAddr);
    if (LOG.isTraceEnabled()) {
        LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
        for (Proposal p : outstandingProposals.values()) {
            long packetZxid = p.packet.getZxid();
            LOG.trace("outstanding proposal: 0x{}",
                    Long.toHexString(packetZxid));
        }
        LOG.trace("outstanding proposals all");
    }

    LOG.info("(zxid & 0xffffffffL) == 0 :" + ((zxid & 0xffffffffL) == 0));
    if ((zxid & 0xffffffffL) == 0) {                              // 1. zxid 全是 0
        /*
         * We no longer process NEWLEADER ack by this method. However,
         * the learner sends ack back to the leader after it gets UPTODATE
         * so we just ignore the message.
         */
        return;
    }

    LOG.info("outstandingProposals :" + outstandingProposals);
    if (outstandingProposals.size() == 0) {                       // 2. 沒有要回應(yīng) ack 的 Proposal 存在
        if (LOG.isDebugEnabled()) {
            LOG.debug("outstanding is 0");
        }
        return;
    }
    LOG.info("lastCommitted :" + lastCommitted + ", zxid:" + zxid);
    if (lastCommitted >= zxid) {                                  // 3. Leader 端處理的 lastCommited >= zxid, 說明 zxid 對應(yīng)的 proposal 已經(jīng)處理過了
        if (LOG.isDebugEnabled()) {
            LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
        }
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);                  // 4. 從投票箱 outstandingProposals 獲取 zxid 對應(yīng)的 Proposal
    LOG.info("p:" + p);
    if (p == null) {
        LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
        return;
    }
    LOG.info("p:" + p + ", sid:" + sid);

    p.ackSet.add(sid);                                            // 5. 將 follower 的 myid 加入結(jié)果列表
    if (LOG.isDebugEnabled()) {
        LOG.info("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size());
    }

    LOG.info("self.getQuorumVerifier().containsQuorum(p.ackSet):" + self.getQuorumVerifier().containsQuorum(p.ackSet));
    if (self.getQuorumVerifier().containsQuorum(p.ackSet)){       // 6. 判斷是否票數(shù)夠了, 則啟動  leader 的 CommitProcessor 來進(jìn)行處理

        LOG.info("zxid:" + zxid + ", lastCommitted:" + lastCommitted);
        if (zxid != lastCommitted+1) {
            LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
            LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
        }

        LOG.info("outstandingProposals:" + outstandingProposals);
        outstandingProposals.remove(zxid);                        // 7. 從 outstandingProposals 里面刪除那個(gè)可以提交的 Proposal
        if (p.request != null) {
            toBeApplied.add(p);                                   // 8. 加入到 toBeApplied 隊(duì)列里面, 這里的 toBeApplied 是 ToBeAppliedRequestProcessor, Leader 共用的隊(duì)列, 在經(jīng)過 CommitProcessor 處理過后, 就到 ToBeAppliedRequestProcessor 里面進(jìn)行處理
            LOG.info("toBeApplied:" + toBeApplied);               // 9. toBeApplied 對應(yīng)的刪除操作在 ToBeAppliedRequestProcessor 里面, 在進(jìn)行刪除時(shí), 其實(shí)已經(jīng)經(jīng)過 FinalRequestProcessor 處理過的
        }

        if (p.request == null) {
            LOG.warn("Going to commmit null request for proposal: {}", p);
        }
        commit(zxid);                                             // 10. 向 集群中的 Followers 發(fā)送 commit 消息, 來通知大家, zxid 對應(yīng)的 Proposal 可以 commit 了
        inform(p);                                                // 11. 向 集群中的 Observers 發(fā)送 commit 消息, 來通知大家, zxid 對應(yīng)的 Proposal 可以 commit 了
        zk.commitProcessor.commit(p.request);                     // 12. 自己進(jìn)行 proposal 的提交 (直接調(diào)用 commitProcessor 進(jìn)行提交 )
                                                                  // 13. 其實(shí)這里隱藏一個(gè)細(xì)節(jié), 就是有可能 有些 Proposal 在 Follower 上進(jìn)行了 commit, 而 Leader 上還沒來得及提交, 就有可能與集群間的其他節(jié)點(diǎn)斷開連接
        LOG.info("pendingSyncs :" + pendingSyncs);
        if(pendingSyncs.containsKey(zxid)){
            for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                sendSync(r);
            }
        }
    }
}

這里處理ACK時(shí), 若已經(jīng)收到集群中過半的ack則就可以向集群中的其他節(jié)點(diǎn)發(fā)送commit, 或inform其他Observer節(jié)點(diǎn), 然后 zk.commitProcessor.commit(p.request) 提交request到Leader的commitProcessor.committedRequests里面, 最后就是 先在FinalRequestProcessor處理一下, 再在ToBeAppliedRequestProcessor.toBeApplied刪除request

10. FollowerZooKeeperServer.commit(long zxid) 提交Proposal
/**
 * When a COMMIT message is received, eventually this method is called, 
 * which matches up the zxid from the COMMIT with (hopefully) the head of
 * the pendingTxns queue and hands it to the commitProcessor to commit.
 * @param zxid - must correspond to the head of pendingTxns if it exists
 */
public void commit(long zxid) {
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    long firstElementZxid = pendingTxns.element().zxid;         // 1. http://blog.csdn.net/fei33423/article/details/53749138
    if (firstElementZxid != zxid) {                             // 2. 這里就有經(jīng)典問題, 在 Leader 端提交了 3 個(gè) Proposal 的信息(comit 1, comit 2, comit 3), 但 follower 在接收到 comit 1 后就接收到 comit 3
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid) // 3. 則就會打印這里的日志, 并且進(jìn)行退出
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);                            // 4. 提交到 commitProcessor.committedRequests 里面
}

而后就是Follower.FinalRequestProcessor進(jìn)行最終的響應(yīng)客戶端處理

11. Session超時(shí)機(jī)制 Leader.ping()

在Leader上有個(gè)while loop會遍歷 LearnerHandler 然后發(fā)送 ping請求給 Follower/Observer

/**
 * ping calls from the leader to the peers
 */
// 這里其實(shí)是 Leader 向 Follower 發(fā)送 ping 請求
// 在向 Learner 發(fā)送ping消息之前, 首先會通過 syncLimitCheck 來檢查
public void ping() {
    long id;
    if (syncLimitCheck.check(System.nanoTime())) {
        synchronized(leader) {
            id = leader.lastProposed;
        }
        QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
        queuePacket(ping);
    } else {
        LOG.warn("Closing connection to peer due to transaction timeout.");
        shutdown();
    }
}
12. Follower處理Leader發(fā)來的ping請求

Follower在接到Leader的ping請求后會將sessionId及timeout發(fā)送給Leader, 進(jìn)行超時(shí)機(jī)制檢查

// Follower 將自己的 sessionId 及超時(shí)時(shí)間發(fā)送給 Leader, 讓 Leader 進(jìn)行 touch 操作, 校驗(yàn)是否 session 超時(shí)
protected void ping(QuorumPacket qp) throws IOException {
    // Send back the ping with our session data
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(bos);
    HashMap<Long, Integer> touchTable = zk   // 1. 獲取 Follower/Observer 的 touchTable(sessionId <-> sessionTimeout) 發(fā)給 Leader 進(jìn)行session超時(shí)的檢測
            .getTouchSnapshot();
    for (Entry<Long, Integer> entry : touchTable.entrySet()) {
        dos.writeLong(entry.getKey());
        dos.writeInt(entry.getValue());
    }
    qp.setData(bos.toByteArray());          // 2. 轉(zhuǎn)化成字節(jié)數(shù)組, 進(jìn)行數(shù)據(jù)的寫入
    writePacket(qp, true);                  // 3. 發(fā)送數(shù)據(jù)包
}
13. Leader處理Follower發(fā)來的sessionId及timeout

Leader在接收到Follower發(fā)來的sessionId及timeout, 將會調(diào)用SessionTrackerImpl.touchSession(long sessionId, int timeout)來進(jìn)行校驗(yàn)

// 更新 session 的過期時(shí)間
synchronized public boolean touchSession(long sessionId, int timeout) {
    ZooTrace.logTraceMessage(LOG,
            ZooTrace.CLIENT_PING_TRACE_MASK,
            "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);

    SessionImpl s = sessionsById.get(sessionId);    // 1. 從 sessionsById 獲取 session, sessionsById 是一個(gè) SessionId <-> SessionImpl 的 map
    // Return false, if the session doesn't exists or marked as closing
    if (s == null || s.isClosing()) {
        return false;
    }                                               // 2. 計(jì)算過期時(shí)間
    long expireTime = roundToInterval(System.currentTimeMillis() + timeout); 
    if (s.tickTime >= expireTime) {
        // Nothing needs to be done
        return true;
    }
    SessionSet set = sessionSets.get(s.tickTime);   // 3. 這里的 SessionSet 就是一個(gè) timeout 對應(yīng)額 Bucket, 將有一個(gè)線程, 在超時(shí)時(shí)間點(diǎn)檢查這個(gè) SessionSet
    if (set != null) {
        set.sessions.remove(s);
    }
    s.tickTime = expireTime;                        // 4. 下面的步驟就是將 session 以 tickTime 為單位放入 sessionSets 中
    set = sessionSets.get(s.tickTime);
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    set.sessions.add(s);                            // 5. 將 SessionImpl 放入對應(yīng)的 SessionSets 里面
    return true;
}
總結(jié)

zookeeper的session機(jī)制只適用于有少量client連接Server的場景(zookeeper的默認(rèn)maxClientCnxns 是60, 超過的話就會socket主動關(guān)閉), 當(dāng)有百萬連接時(shí), 用這種session集中, 用一條線程檢測超時(shí)的機(jī)制可能在性能上出現(xiàn)問題, 當(dāng)zookeeper還是給出了一種很好的思考方向! 在理解了session創(chuàng)建機(jī)制后, 對應(yīng)的create/setData/delete就很好理解了!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末仗阅,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子殿托,更是在濱河造成了極大的恐慌霹菊,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異旋廷,居然都是意外死亡鸠按,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門饶碘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來目尖,“玉大人,你說我怎么就攤上這事扎运∩” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵豪治,是天一觀的道長洞拨。 經(jīng)常有香客問我,道長负拟,這世上最難降的妖魔是什么烦衣? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮掩浙,結(jié)果婚禮上花吟,老公的妹妹穿的比我還像新娘。我一直安慰自己厨姚,他們只是感情好衅澈,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谬墙,像睡著了一般今布。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上芭梯,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天次乓,我揣著相機(jī)與錄音皆辽,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛轩勘,可吹牛的內(nèi)容都是我干的手蝎。 我是一名探鬼主播局蚀,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼彻桃,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了澎媒?” 一聲冷哼從身側(cè)響起搞乏,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎戒努,沒想到半個(gè)月后请敦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年侍筛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了萤皂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡匣椰,死狀恐怖裆熙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情禽笑,我是刑警寧澤入录,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站佳镜,受9級特大地震影響僚稿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜邀杏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一贫奠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧望蜡,春花似錦、人聲如沸拷恨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腕侄。三九已至小泉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冕杠,已是汗流浹背微姊。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留分预,地道東北人兢交。 一個(gè)月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像笼痹,于是被迫代替她去往敵國和親配喳。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容