[Zookeeper] Zookeeper 請求流程1 (Learner部分)

目錄:

  • 數(shù)據(jù)同步與初始化(選舉完leader之后)
  • 分角色業(yè)務(wù)處理分析(leader,follower,observer)

1.數(shù)據(jù)同步與初始化


選舉完leader之后矾踱,只有當(dāng)各個角色與leader保持?jǐn)?shù)據(jù)同步袭灯,才能對外提供服務(wù)昵观。

其中惠啄,服務(wù)器間數(shù)據(jù)同步過程

基本分為三種方式:

  1. SNAP方式(snapshot,同步整個文件)
  2. DIFF方式
  3. TRUNC方式

1.1 基本流程

其中第3步:Follower返回Leader的AckEpoch,會包含當(dāng)前的最大zxid席镀,Leader節(jié)點會將該zxid與其minZxid诱桂,maxZxid進(jìn)行比較。
這個 [minZxid,maxZxid] 實際上是在leader端緩存的一個事務(wù)隊列淑掌。

其中第6步:發(fā)送的NewLeader蒿讥,說明當(dāng)前數(shù)據(jù)已經(jīng)同步完(Leader已經(jīng)將該同步的內(nèi)容發(fā)送給Follower)

三種方式的區(qū)別:

  1. 如果Follower端的zxid小于minZxid,說明Leader與Follower之間數(shù)據(jù)差距非常大抛腕,直接采取Snap方式芋绸,F(xiàn)ollower就去接收Leader發(fā)送的snapshot文件

  2. 如果Follower端的zxid處于minZxid,maxZxid之間担敌,采取Diff方式侥钳,即Leader只要發(fā)送區(qū)間為[zxid,maxZxid]的事務(wù)即可,F(xiàn)ollower接收到這些事務(wù)柄错,進(jìn)行持久化并更新內(nèi)存

  3. 如果Follower端的zxid大于maxZxid舷夺,采取Trunc方式,F(xiàn)ollower則將大于maxZxid的事務(wù)日志刪除

1.2 類說明

  1. Learner類
    Learner包括Follower和Observer售貌,其中比較重要的leaderIs,leaderOs,表示是鏈接到Leader的輸入流给猾,輸出流

  2. LearnerHandler(繼承自ZooKeeperThread)

1.3 詳細(xì)說明

在QuorumPeer中,進(jìn)行FastLeaderElection之后颂跨,即在QuorumPeer的run方法中敢伸,

       try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                logFactory, this,
                                new ZooKeeperServer.BasicDataTreeBuilder(),
                                this.zkDb);
    
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                }
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            try {
                MBeanRegistry.getInstance().unregisterAll();
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
        }
    }

通過getPeerState()方法,獲取當(dāng)前服務(wù)器的state恒削,如果是FOLLOWING狀態(tài)池颈,

followLeader()方法:Follower在提供服務(wù)給客戶端之間完成注冊到Leader的動作。
注冊分為以下3個主要步驟:

  1. 調(diào)用connectToLeader方法連接到Leader钓丰。
  2. 調(diào)用registerWithLeader方法注冊到Leader躯砰,交換各自的sid、zxid和Epoch等信息携丁,Leader以此決定事務(wù)同步的方式琢歇。
  3. 調(diào)用SyncWithLeader跟Leader進(jìn)行事務(wù)數(shù)據(jù)同步,處理SNAP/DIFF/TRUNC包。
  • connectToLeader:創(chuàng)建Socket連接到Leader李茫,該方法定義在Follower父類Learner中揭保,加了重試機(jī)制,最多可以嘗試5次連接魄宏。連接成功后秸侣,Leader會創(chuàng)建一個LearnerHandler專門處理與該Follower之間的QuorumPacket消息的傳遞。

  • registerWithLeader:首先會發(fā)送FOLLOWERINFO包給Leader宠互,告訴Leader自己的身份屬性(Follower的zxid味榛,sid)。然后等待Leader回復(fù)的LEADINFO包名秀,獲取Leader的Epoch和zxid值,并更新Follower的Epoch和zxid值藕溅,以Leader信息為準(zhǔn)匕得。
    最后,給Leader發(fā)送ACKINFO包巾表,告訴Leader這次Follower已經(jīng)與Leader的zxid同步了汁掠。

  • SyncWithLeader:與Leader同步數(shù)據(jù),即同步Leader的事務(wù)到Follower

3.1
首先讀取同步數(shù)據(jù)包集币,主要代碼如下:

QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        readPacket(qp);
        // 提交的packets
        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
        // 未提交的packets
        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
        synchronized (zk) {
            // Diff方式下,不需要進(jìn)行snapshot
            if (qp.getType() == Leader.DIFF) {
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                snapshotNeeded = false;
            }
            else if (qp.getType() == Leader.SNAP) {
                LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // clear our own database and read
                // 清空日志,minZxid和maxZxid都為0,,新構(gòu)建DataTree
                zk.getZKDatabase().clear();
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got " + signature);
                    throw new IOException("Missing signature");                   
                }
                // 設(shè)置最近的zxid
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
            } else if (qp.getType() == Leader.TRUNC) {
                //we need to truncate the log to the lastzxid of the leader
                LOG.warn("Truncating log to get in sync with the leader 0x"
                        + Long.toHexString(qp.getZxid()));
                boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log "
                            + Long.toHexString(qp.getZxid()));
                    System.exit(13);
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
            }

<1> SNAP:快照模式考阱,這種模式下Leader將整個完整數(shù)據(jù)庫傳給Follower
<2> TRUNC:截斷模式,這種模式表明Follower的數(shù)據(jù)比Leader還多鞠苟,為了維持一致性需要將Follower多余的數(shù)據(jù)刪除
<3> DIFF:差異模式乞榨,說明Follower比Leader的事務(wù)少,需要給Follower補(bǔ)足当娱,這時候Leader會將需要補(bǔ)充的事務(wù)生成PROPOSAL包和COMMIT包發(fā)給Follower執(zhí)行吃既。

3.2
處理后續(xù)消息(即QuorumPacket類型)
比如Proposal,Commit跨细,NewLeader等鹦倚,其中Proposal是指在同步期間收到的Leader發(fā)送的寫請求信息,緩存在packetsNotCommitted中冀惭。

            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    pif.hdr = new TxnHeader();
                    pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(pif.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();
                    packetsNotCommitted.add(pif);
                    break;
                case Leader.COMMIT:
                    if (!writeToTxnLog) {
                        pif = packetsNotCommitted.peekFirst();
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                        } else {
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                    /*
                     * Only observer get this type of packet. We treat this
                     * as receiving PROPOSAL and COMMMIT.
                     */
                    PacketInFlight packet = new PacketInFlight();
                    packet.hdr = new TxnHeader();
                    packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                    // Log warning message if txn comes out-of-order
                    if (packet.hdr.getZxid() != lastQueued + 1) {
                        LOG.warn("Got zxid 0x"
                                + Long.toHexString(packet.hdr.getZxid())
                                + " expected 0x"
                                + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = packet.hdr.getZxid();
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.UPTODATE:
                    if (isPreZAB1_0) {
                        zk.takeSnapshot();
                        self.setCurrentEpoch(newEpoch);
                    }
                    self.cnxnFactory.setZooKeeperServer(zk);                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                 
                    File updating = new File(self.getTxnFactory().getSnapDir(),
                                        QuorumPeer.UPDATING_EPOCH_FILENAME);
                    if (!updating.exists() && !updating.createNewFile()) {
                        throw new IOException("Failed to create " +
                                              updating.toString());
                    }
                    if (snapshotNeeded) {
                        zk.takeSnapshot();
                    }
                    self.setCurrentEpoch(newEpoch);
                    if (!updating.delete()) {
                        throw new IOException("Failed to delete " +
                                              updating.toString());
                    }
                    writeToTxnLog = true; 
                    isPreZAB1_0 = false;
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }

之后震叙,Leader會發(fā)送NEWLEAER包,F(xiàn)ollower收到NEWLEADER包后回復(fù)ACK給Leader散休,
最后媒楼,Leader發(fā)送UPTODATE包表示同步完成,F(xiàn)ollower這時啟動服務(wù)端并跳出本次循環(huán)戚丸,準(zhǔn)備結(jié)束整個注冊過程匣砖。

3.3 Follower主流程
Follower是Learner的子類,F(xiàn)ollower的啟動方法就是followLeader。

// 尋找Leader角色
            QuorumServer leaderServer = findLeader();            
            try {
                // 嘗試5次連接Leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                // 建立Following連接,
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                // 進(jìn)行數(shù)據(jù)同步
               syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }

啟動時猴鲫,首先與Leader同步數(shù)據(jù)对人,然后啟動FollowerZooKeeperServer,在FollowerZooKeeperServer運行的同時拂共,額外啟動while循環(huán)等待QuorumPacket包牺弄,調(diào)用processPacket方法處理這些包。

processPacket處理QuorumPeer傳送的
QuorumPacket宜狐,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT势告。當(dāng)然還有PING、COMMITANDACTIVATE等包類型抚恒。

該方法在收到Leader發(fā)送過來的QuorumPacket時被調(diào)用咱台,主要是響應(yīng)PROPOSAL和COMMIT兩種類型的消息。PROPOSAL是Leader將要執(zhí)行的寫事務(wù)命令俭驮;COMMIT是提交命令回溺。Follower只有在收到COMMIT消息后才會讓PROPOSAL命令的內(nèi)容生效。

同一個寫事務(wù)命令會在Leader和多個Follower上都執(zhí)行一次混萝,保證集群數(shù)據(jù)的一致性遗遵。

        case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;

Follower收到PROPOSAL消息后調(diào)用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息后調(diào)用FollowerZooKeeperServer的commit方法逸嘀。

  • PROPOSAL包
    Leader發(fā)送給集群中所有follower的寫請求包车要。
    Leader執(zhí)行寫操作時需要告之集群中的Learner,讓大家也執(zhí)行寫操作崭倘,保證集群數(shù)據(jù)的一致性翼岁。PROPOSAL是嚴(yán)格按照順序執(zhí)行的,這也是ZOOKEEPER的核心設(shè)計思想之一

  • COMMIT包
    當(dāng)Leader認(rèn)為一個Proposal已被大多數(shù)Follower持久化并等待執(zhí)行后會發(fā)送COMMIT包司光,通知各Follower可以提交執(zhí)行該Proposal了登澜,最后調(diào)用到FinalRequestProcessor執(zhí)行寫操作,通過這種機(jī)制保證寫操作能被大半數(shù)集群機(jī)器執(zhí)行

3.4 Observer主流程
Observer和Follower功能類似飘庄,主要的差別就是不參與選舉脑蠕。

Observer的入口方法是observerLeader。當(dāng)QuorumPeer的狀態(tài)是OBSERVING時會啟動Observer并調(diào)用observerLeader方法跪削。

observerLeader同F(xiàn)ollower的followLeader方法類似谴仙,首先注冊到Leader,事務(wù)同步后進(jìn)入QuorumPacket包循環(huán)處理過程碾盐,調(diào)用processPacket方法處理QuorumPacket晃跺。

processPacket比Follower要簡單許多,最主要是處理INFORM包來執(zhí)行Leader的寫請求命令毫玖。

這里處理的是INFORM消息掀虎,Leader群發(fā)寫事務(wù)時凌盯,給Follower發(fā)的是PROPOSAL并要等待Follower確認(rèn);而給Observer發(fā)的則是INFORM消息并且不需要Obverver回復(fù)ACK消息來確認(rèn)烹玉。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末驰怎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子二打,更是在濱河造成了極大的恐慌县忌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件继效,死亡現(xiàn)場離奇詭異症杏,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)瑞信,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門厉颤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人凡简,你說我怎么就攤上這事逼友。” “怎么了潘鲫?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵翁逞,是天一觀的道長肋杖。 經(jīng)常有香客問我溉仑,道長,這世上最難降的妖魔是什么状植? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任浊竟,我火速辦了婚禮,結(jié)果婚禮上津畸,老公的妹妹穿的比我還像新娘振定。我一直安慰自己,他們只是感情好肉拓,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布后频。 她就那樣靜靜地躺著,像睡著了一般暖途。 火紅的嫁衣襯著肌膚如雪卑惜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天驻售,我揣著相機(jī)與錄音露久,去河邊找鬼。 笑死欺栗,一個胖子當(dāng)著我的面吹牛毫痕,可吹牛的內(nèi)容都是我干的征峦。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼消请,長吁一口氣:“原來是場噩夢啊……” “哼栏笆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起梯啤,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤竖伯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后因宇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體七婴,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年察滑,在試婚紗的時候發(fā)現(xiàn)自己被綠了打厘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡贺辰,死狀恐怖户盯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情饲化,我是刑警寧澤莽鸭,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站吃靠,受9級特大地震影響硫眨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜巢块,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一礁阁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧族奢,春花似錦姥闭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至廊敌,卻和暖如春铜跑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背庭敦。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工疼进, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人秧廉。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓伞广,卻偏偏與公主長得像拣帽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子嚼锄,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355