【Zookeeper】 Server的啟動流程

一:前言

當服務通過選舉算法進行選舉完之后,各個服務器就需要設置自己的角色橱野,并啟動相對應的服務(也就是服務的初始化)朽缴,之后就等待客戶端的請求,處理響應的請求水援。

二:流程分析

2.1密强、 LEADER

功能:接收客戶端的請求, 事務請求的提議者蜗元。

首先我們查看啟動代碼:

    // makeLeader(logFactory) 新建一個Leader實體或渤,打開Leader服務器的交換信息的接口,等待與Learner通信
    setLeader(makeLeader(logFactory));
    // Leader服務啟動的主方法
    leader.lead();

Leader服務啟動的主方法leader.lead();流程分析:

  void lead() throws IOException, InterruptedException {
       ...
            //1> 加載FileTxnSnapLog中的數(shù)據(jù)奕扣, 并把每一個事務數(shù)據(jù)封裝成一個Proposal,放入committedLog
            //   中薪鹦,并計算minCommittedLog, maxCommittedLog惯豆, 數(shù)據(jù)放入ZKDatabase
            //2> 處理事務數(shù)據(jù)時距芬,若事務Type為Session的數(shù)據(jù)涝开,響應的增刪到sessionsWithTimeouts中
            //3> 設置Leader的highestZxid
            //4> 通過ZKDatabase中的sessions與sessionsWithTimeouts進行比較,Kill失效的session框仔, 并將
            //     失效相關聯(lián)的臨時節(jié)點進行刪除
            zk.loadData();</br>

            // 開啟接收Leaner的連接線程舀武, 并把每一個Leaner的連接封裝成一個LearnerHandler實體, 添加到
            // Leader的learners列表中离斩,每一個LearnerHandler開啟一個線程處理響應的Leaner的信息
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();</br>
            
            readyToStart = true;
            // 獲取當前選舉的輪次, 同步等待法定人數(shù)的Leaner注冊身份(FOLLOWERINFO/OBSERVERINFO)
            // 到Leader 超時時間為  initLimit \* tickTime
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            // 根據(jù)當前的輪次银舱,初始化zxid
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));</br>
            ...</br>
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);</br>
            ...</br>
            //Leaner向Leaner發(fā)送LEADERINFO消息,并等待法定人數(shù)的Leaner的響應epoch的消息
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);
            try {
                //等待法定人數(shù)的Leaner初始化同步數(shù)據(jù)響應的消息跛梗,超時時間為 initLimit \* tickTime
                waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
            } catch (InterruptedException e) {
                ...
            }
            
            //開啟對客戶端的服務的主方法
            startZkServer();
            
            //是否有zxid的初始化設置
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }
           
           //對每一個Leaner發(fā)起ping檢測消息寻馏, 檢測的時間間隔為tickTime / 2
           // 檢測learnerType = LearnerType.PARTICIPANT的人數(shù)是否少于法定的人數(shù),
           // 如果少于核偿,則   shutDown服務诚欠,進行新一輪的選舉
            boolean tickSkip = true;
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick++;
                }
                HashSet<Long> syncedSet = new HashSet<Long>();
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }

              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    return;
              } 
              tickSkip = !tickSkip;
            }
           //...
    }

下面我們隊上面代碼的兩個流程進行流程分析①: Leaner向Leader的注冊同步流程;②:Leader與客戶端服務的流程漾岳。

2.1.1、 Leaner向Leader的注冊同步流程

  • 1.尼荆、接收到Leaner的信息注冊消息 type = Leader.FOLLOWERINFO/OBSERVERINFO + acceptedEpoch + myid, 根據(jù)消息設置LeanerHandler的字段值左腔, this.sid = li.getServerid();this.version = li.getProtocolVersion();, 同步等待法定人數(shù)Leander的注冊消息得到newEpoch(如果每次接收到的lastAcceptedEpoch >= epoch捅儒, 則設置epoch = lastAcceptedEpoch+1;), 發(fā)送Leader信息(type:Leader.LEADERINFO, zxid: newEpoch + 0)給Leander
  • 2液样、同步等待Leaner接收到Leander的信息包之后返回的ACKEPOCH消息, 從ACKEPOCH消息獲取到Leaner的lastLoggerZxid(為了防止與Leader的lastLoggerZxid沖突巧还, 用peerLastZxid代替)鞭莽。處理同步數(shù)據(jù)邏輯:
  • i: 如果peerLastZxid == LastZxid,則packetToSend置為DIFF, zxidToSend置為peerLastZxid
  • ii: 如果proposals.size() != 0 (proposals為committedLog的列表)
    • a. 子條件maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid麸祷, 比較proposals中最接近且小于等于peerLastZxid的zxid, 如果小于撮抓, packetToSend置為TRUNC, zxidToSend置為此值摇锋; 否則packetToSend置為DIFF, zxidToSend置為maxCommittedLog丹拯, 然后把所有大于peerLastZxid 的propose 封裝成QuorumPacket (type: commit, zxid: propose.packet.getZxid())放入queuedPackets隊列中,
    • b.子條件peerLastZxid > maxCommittedLog, 則packetToSend置為TRUNC荸恕, zxidToSend置為maxCommittedLog
  • iii: peerLastZxid > maxCommittedLog乖酬, packetToSend置為TRUNC, zxidToSend置為 maxCommittedLog
  • iv: 否則packetToSend置為SNAP
  • 3融求、將toBeApplied(ToBeAppliedRequestProcessor未完成的請求)列表中的數(shù)據(jù)(type: Leader.COMMIT)添加到queuedPackets隊列中咬像, 如果handler.LearnerType() == LearnerType.PARTICIPANT(即Leander的角色為Follower), 則將outstandingProposals(ProposalRequestProcessor未完成的請求) 中的提議數(shù)據(jù)(type: Leader.PROPOSAL)添加到queuedPackets隊列中, 將Leaner 根據(jù)leanerType分別放入forwardingFollowers 和 observingLearners集合中
  • 4县昂、將新一輪的Leader的信息(type: NEWLEADER肮柜, zxid: newEpoch + 0)放入queuedPackets隊列中
  • 5、發(fā)送同步信息(type: packetToSend, zxidToSend: zxidToSend), 如果為SNAP消息倒彰, 則zxidToSend為Leader的lastLoggerZxid审洞, 且將ZKDatabase序列化為數(shù)據(jù)流發(fā)送給Leaner
  • 6、開啟線程待讳, 發(fā)送queuedPackets 隊列中包給Leander
  • 7芒澜、接收來自Leander同步的Ack消息, 同步等待法定人數(shù)Leaner的人數(shù)的回應
  • 8创淡、發(fā)送Leader.UPTODATE消息給客戶端痴晦, 表示可以使用數(shù)據(jù)了
    <b>注:</b>可以看到queuedPackets隊列中的數(shù)據(jù)的順序為:Leader.COMMIT —> Leader.PROPOSAL——> Leader.NEWLEADER —> Leader.UPTODATE

Leader異步等待法定的客戶端注冊同步:Leader的Lead()方法中:

// 異步等待Leaner信息的注冊
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 異步等待Leaner接收到Leader的信息的EpochAck
waitForEpochAck(self.getId(), leaderStateSummary);
// 異步等待Leander對Leader的NEWLEADER消息的Ack
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);

2.1.2、 Leader與客戶端服務的流程

zk處理客戶端的請求都是通過Proccessor進行鏈路處理琳彩。對于LeaderZooKeeperServer對應的Proccessor的處理關系為: PrepRequestProcessor ——> ProposalRequestProcessor ——> CommitProcessor ——> Leader.ToBeAppliedRequestProcessor ——>FinalRequestProcessor誊酌, 同時ProposalRequestProcessor 將事務請求(request.hdr != null)交給同步刷盤處理器處理 ProposalRequestProcessor ——> SyncRequestProcessor ——> AckRequestProcessor。AckRequestProcessor處理器將刷盤成功的請求交給Leader作為一個提議露乏,作為Leader判斷提議成功的法定人數(shù)碧浊, 成立交給CommitProcessor

圖片.png

2.2、 FOLLOWER

功能:接收客戶端的請求施无, 事務請求提議的參與者,將事務請求轉發(fā)給Leader必孤。

首先查看啟動代碼

     // makeFollower(logFactory)新建一個Follwer實體猾骡,建立與Leader通信
    setFollower(makeFollower(logFactory));
    // follower啟動的主方法
    follower.followLeader();

Follower服務啟動的主方法follower.followLeader();流程分析:

void followLeader() throws InterruptedException {
       ...
        try {
            InetSocketAddress addr = findLeader();            
            try {
                //與Leader建立通信
                connectToLeader(addr);
                //將自己的信息注冊到Leader
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);</br>

                //初始化同步Leader的數(shù)據(jù)
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);</br>

                //接收Leader通信數(shù)據(jù),并做響應的業(yè)務處理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
               ...
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }

2.2.1 Follwer注冊與同步流程分析

  • 1:發(fā)送自身信息包(type: Leader.FOLLOWERINFO, zxid: acceptedEpoch + 1)給Leader
  • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1)敷搪, 判斷newEpoch 與 自身acceptedEpoch 的大行讼搿: 大于, 則將 自身acceptedEpoch設置為 newEpoch赡勘, 發(fā)送的epoch設置為舊的acceptedEpoch嫂便; 相等, 發(fā)送epoch設置為-1闸与; 否則異常毙替。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
  • 3: 接收來自Leader根據(jù)lastLoggedZxid發(fā)送過來的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP践樱, 則將包中的數(shù)據(jù)序列化作為自己的ZKDatabase厂画; Leader.TRUNC, 則根據(jù)接收到的zxid來TRUNC當前ZKDatabase的數(shù)據(jù)拷邢, 設置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
  • 4: 接收來自Leader的同步數(shù)據(jù): Leader.COMMIT——> Leader.PROPOSAL——> Leader.NEWLEADER ——> Leader.UPTODATE
    Leader.PROPOSAL: 提議的數(shù)據(jù)袱院, 添加到packetsNotCommitted 隊列中
    Leader.COMMIT: 需要提交的數(shù)據(jù),添加到packetsCommitted隊列
    Leader.NEWLEADER: 接收新的Leader領導的事務,保存快照數(shù)據(jù)忽洛, 設置self.setCurrentEpoch(newEpoch);, 回復AcK消息給Leader
    Leader.UPTODATE腻惠, 同步完成,設置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk))欲虚, 跳出同步流程
  • 5: 對packetsNotCommitted 中的數(shù)據(jù)調用fzk.logRequest(p.hdr, p.rec)(刷盤集灌, 返回提議AcK),對packetsCommitted中的數(shù)據(jù)調用fzk.commit(zxid);(提交操作)

2.2.1 Follwer與客戶端的服務流程

對于角色Follwer的處理客戶端的請求是通過下面的RequestProcessor進行處理: FollowerRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor; 同時將其轉發(fā)給Leader是并把請求苍在,Leader會進行提議請求绝页,將接收到的提議請求交給SyncRequestProcessor ——> SendAckRequestProcessor, SendAckRequestProcessor將ack消息發(fā)送給Leader,作為Leader判斷提議成功的法定人數(shù)寂恬,成立交給CommitProcessor 续誉。

圖片.png

2.3、 OBSERVER

功能:接收客戶端的請求初肉, 將事務請求轉發(fā)給Leader酷鸦。

首先查看啟動代碼

// makeObserver(logFactory)新建一個Observer實體,建立與Leader通信
setObserver(makeObserver(logFactory));
// observer啟動的主方法
observer.observeLeader();

Observer服務啟動的主方法observer.observeLeader();流程分析:

void observeLeader() throws InterruptedException {
     ...
            try {
                //建立與Leader的連接
                connectToLeader(addr);
                //將自身的信息注冊到Leader中牙咏, 返回Leader的lastLoggerZxid
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                //同步Leader的數(shù)據(jù)信息
                syncWithLeader(newLeaderZxid);
                 //接收Leader通信數(shù)據(jù)臼隔,并做響應的業(yè)務處理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);                   
                }
            } catch (Exception e) {
               ...
            }
       ...
    }

3.3.1 Observer注冊與同步流程分析

  • 1:發(fā)送自身信息包(type: Leader.OBSERVERINFO, zxid: acceptedEpoch + 1)給Leader
  • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判斷newEpoch 與 自身acceptedEpoch 的大型: 大于摔握, 則將 自身acceptedEpoch設置為 newEpoch, 發(fā)送的epoch設置為舊的acceptedEpoch丁寄; 相等氨淌, 發(fā)送epoch設置為-1; 否則異常伊磺。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
  • 3: 接收來自Leader根據(jù)lastLoggedZxid發(fā)送過來的初始化同步信息: Leader.DIFF, 不做任何操作盛正; Leader.SNAP, 則將包中的數(shù)據(jù)序列化作為自己的ZKDatabase屑埋; Leader.TRUNC豪筝, 則根據(jù)接收到的zxid來TRUNC當前ZKDatabase的數(shù)據(jù), 設置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
  • 4: 接收來自Leader的同步數(shù)據(jù): Leader.COMMIT——> Leader.NEWLEADER ——> Leader.UPTODATE
    Leader.COMMIT: 需要提交的數(shù)據(jù)摘能,添加到packetsCommitted隊列
    Leader.NEWLEADER: 接收新的Leader領導的事務续崖,保存快照數(shù)據(jù), 設置self.setCurrentEpoch(newEpoch);, 回復AcK消息給Leader
    Leader.UPTODATE团搞, 同步完成袜刷,設置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)), 跳出同步流程

3.3.2 Observer與客戶端的服務流程

對于Observer的處理客戶端的請求是通過下面的RequestProcessor進行處理:ObserverRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor ; 同時對于Leader的Leader.INFORM消息會同時交給SyncRequestProcessor(刷盤操作)跟CommitProcessor莺丑。

圖片.png
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末著蟹,一起剝皮案震驚了整個濱河市墩蔓,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌萧豆,老刑警劉巖奸披,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異涮雷,居然都是意外死亡阵面,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門洪鸭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來样刷,“玉大人,你說我怎么就攤上這事览爵≈帽牵” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵蜓竹,是天一觀的道長箕母。 經(jīng)常有香客問我,道長俱济,這世上最難降的妖魔是什么嘶是? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮蛛碌,結果婚禮上聂喇,老公的妹妹穿的比我還像新娘。我一直安慰自己蔚携,他們只是感情好希太,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著浮梢,像睡著了一般跛十。 火紅的嫁衣襯著肌膚如雪彤路。 梳的紋絲不亂的頭發(fā)上秕硝,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天,我揣著相機與錄音洲尊,去河邊找鬼远豺。 笑死,一個胖子當著我的面吹牛坞嘀,可吹牛的內容都是我干的躯护。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼丽涩,長吁一口氣:“原來是場噩夢啊……” “哼棺滞!你這毒婦竟也來了裁蚁?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤继准,失蹤者是張志新(化名)和其女友劉穎枉证,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體移必,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡室谚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了崔泵。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秒赤。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖憎瘸,靈堂內的尸體忽然破棺而出入篮,到底是詐尸還是另有隱情,我是刑警寧澤含思,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布崎弃,位于F島的核電站,受9級特大地震影響含潘,放射性物質發(fā)生泄漏饲做。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一遏弱、第九天 我趴在偏房一處隱蔽的房頂上張望盆均。 院中可真熱鬧,春花似錦漱逸、人聲如沸泪姨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽肮砾。三九已至,卻和暖如春袋坑,著一層夾襖步出監(jiān)牢的瞬間仗处,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工枣宫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留婆誓,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓也颤,卻偏偏與公主長得像洋幻,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子翅娶,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內容