[Zookeeper] 選舉流程Fast Leader

選舉信息-選舉流程-選舉場(chǎng)景-源碼分析

1 選舉信息

<1> 服務(wù)器角色信息
在Zookeeper集群提供服務(wù)時(shí),集群中角色如下:

  • Leader:一個(gè)Zookeeper集群同一時(shí)間只有一個(gè)Leader。所有的寫操作必須要通過Leader完成废酷,再由Leader將寫操作廣播給其它服務(wù)器栅屏。
  • Follower:一個(gè)Zookeeper集群可以存在多個(gè)Follower逛艰。Follower可直接處理并且返回客戶端的讀請(qǐng)求蘑志,同時(shí)會(huì)將寫請(qǐng)求轉(zhuǎn)發(fā)給Leader處理金砍,并且負(fù)責(zé)在Leader處理寫請(qǐng)求時(shí)對(duì)請(qǐng)求進(jìn)行投票址否。另外Follower可以參與競(jìng)選Leader餐蔬。
  • Observer:Observer功能與Follower類似碎紊,但是沒有投票權(quán),也不會(huì)參與競(jìng)選Leader樊诺。

<2> 服務(wù)器狀態(tài)信息

  • Looking:尋找Leader狀態(tài)仗考。當(dāng)服務(wù)器處于該狀態(tài)時(shí),會(huì)認(rèn)為當(dāng)前集群中沒有Leader词爬,因此需要進(jìn)入Leader選舉流程
  • Following:跟隨者狀態(tài)秃嗜,表明當(dāng)前服務(wù)器角色是Follower
  • Leading:領(lǐng)導(dǎo)者狀態(tài),表明當(dāng)前服務(wù)器角色是Leader
  • Observing:觀察者狀態(tài)顿膨,表明當(dāng)前服務(wù)器角色是Observer

<3> 投票信息

  • leader:被選舉的Leader的sid
  • zxid:被選舉的Leader的事務(wù)id
  • sid:當(dāng)前服務(wù)器的sid
  • electionEpoch:當(dāng)前投票的輪次
  • peerEpoch:當(dāng)前服務(wù)器的Epoch

選票PK:
(1)選票中Epoch大的優(yōu)先級(jí)高锅锨;
(2)選票中Zxid的大的優(yōu)先級(jí)高;
(3)選票中Sid大的優(yōu)先級(jí)高恋沃;

選票終止條件:
以某一選票數(shù)占集群中參與競(jìng)選節(jié)點(diǎn)(除Observer外)數(shù)量的一半以上必搞,選舉結(jié)束;

2 選舉流程

選舉流程

3 選舉場(chǎng)景

<1> 票箱信息
票箱信息:保存選舉的服務(wù)器SID和被選舉的服務(wù)器SID囊咏,即(sid恕洲,leader);例如集群中節(jié)點(diǎn)為SID=1的服務(wù)器選舉節(jié)點(diǎn)為SID=3的服務(wù)器梅割,則票箱信息為(1,3)

<2> 選票信息
選票信息為(electionEpoch霜第,leader,zxid)炮捧,分別代表選舉的輪次庶诡、被選舉服務(wù)器的SID,被選舉服務(wù)器的zxid

<3> 初始啟動(dòng)選舉
(1)每個(gè)Server發(fā)出一個(gè)投票咆课,初始情況末誓,Server都會(huì)將自己作為L(zhǎng)eader服務(wù)器來進(jìn)行投票,比如Server1會(huì)發(fā)出(1,1,0)選票(表示epoch為1书蚪,選舉leader的sid為1喇澡,并且被選舉的服務(wù)器zxid為0),然后各自將這個(gè)投票發(fā)給集群中其他機(jī)器殊校,Server1的票箱信息為(1,1)(表示投票的服務(wù)器sid為1晴玖,選舉的leader的sid為1)
(2)接受來自各個(gè)服務(wù)器的投票。集群的每個(gè)服務(wù)器收到投票后为流,首先判斷該投票的有效性呕屎,如檢查是否是本輪投票,是否來自Looking狀態(tài)的服務(wù)器敬察。
(3)處理投票秀睛。針對(duì)每一個(gè)投票,服務(wù)器需要將別人的票和自己的票進(jìn)行PK莲祸,pk規(guī)則如上所示蹂安。
(4)統(tǒng)計(jì)投票椭迎。每次投票后,服務(wù)器都會(huì)統(tǒng)計(jì)投票信息田盈,判斷是否已經(jīng)有過半機(jī)器接受到相同的投票信息畜号。
(5)改變服務(wù)器狀態(tài)。一旦確定了Leader允瞧,每個(gè)服務(wù)器就會(huì)更新自己的狀態(tài)简软,如果是Follower,那么就變更為FOLLOWING瓷式,如果是Leader替饿,就變更為L(zhǎng)EADING。

<4> Follower重啟

<5> 運(yùn)行期間選舉 leader宕機(jī)
與上面相比贸典,會(huì)在開始添加一個(gè)步驟【變更狀態(tài)】视卢。
Leader掛后,余下的非Observer服務(wù)器都會(huì)講自己的服務(wù)器狀態(tài)變更為L(zhǎng)OOKING廊驼,然后開始進(jìn)入Leader選舉過程据过。

4 源碼分析

4.1 類圖關(guān)系

類圖關(guān)系

4.1 FastLeaderElection

<1> Notification

Notification表示收到的選舉投票信息(其他服務(wù)器發(fā)來的選舉投票信息),其包含了被選舉者的id妒挎、zxid绳锅、選舉周期等信息,其buildMsg方法將選舉信息封裝至ByteBuffer中再進(jìn)行發(fā)送酝掩。

<2> ToSend

ToSend表示發(fā)送給其他服務(wù)器的選舉投票信息鳞芙,也包含了被選舉者的id、zxid期虾、選舉周期等信息原朝。

<3> Messenger
其中 WorkerReceiver:

選票接收器,不斷地從QuorumCnxManager中獲取其他服務(wù)器發(fā)來的選舉消息镶苞,并將其轉(zhuǎn)換成一個(gè)選票喳坠,然后保存到recvqueue中

其中 WorkerSender:

選票發(fā)送器,其會(huì)不斷地從sendqueue中獲取待發(fā)送的選票茂蚓,并將其傳遞到底層QuorumCnxManager中壕鹉,其過程是將FastLeaderElection的ToSend轉(zhuǎn)化為QuorumCnxManager的Message

4.2 QuorumCnxManager

該類有四個(gè)內(nèi)部類:
SendWorker類,Message類聋涨,RecWorker類晾浴,Listener類
<1> SendWorder
這個(gè)類作為“發(fā)送者”,繼承ZooKeeperThread牍白,線程不斷地從發(fā)送隊(duì)列取出脊凰,發(fā)送給對(duì)應(yīng)sid的機(jī)器。

        Long sid; //目標(biāo)機(jī)器sid淹朋,不是當(dāng)前機(jī)器sid
        Socket sock;
        RecvWorker recvWorker; //該sid對(duì)應(yīng)的RecWorker
        volatile boolean running = true;
        DataOutputStream dout;

<2> Message

    static public class Message {
        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }

        ByteBuffer buffer;
        long sid;
    }

sid為消息來源方的sid笙各,buffer即指消息體

<3> RecvWorker

        Long sid;
        Socket sock;
        volatile boolean running = true;
        final DataInputStream din;
        final SendWorker sw;

4.3 FastLeaderElection中的lookForLeader()

    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            // 這是票箱的意思嗎
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                // 邏輯時(shí)鐘自增,每進(jìn)行一輪新的Leader選舉,都需要更新邏輯時(shí)鐘
                logicalclock.incrementAndGet();
                // 更新選票(初始化選票)
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            // 廣播選票
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */
            // 循環(huán)中(當(dāng)前服務(wù)器處于LOOKING)
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                // 從接受到的選票隊(duì)列中拿取一個(gè) Notification
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                // 接受到的選票隊(duì)列中無選票
                if(n == null){
                    // 如果發(fā)往各個(gè)服務(wù)器的消息隊(duì)列都為空
                    if(manager.haveDelivered()){
                        // 廣播選票
                        sendNotifications();
                    } else {
                        // 存在未發(fā)送的消息,遍歷與服務(wù)器進(jìn)行連接
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                // 選票接收隊(duì)列中存在選票并且該選票的發(fā)送者是有資格投票的
                else if(self.getVotingView().containsKey(n.sid)) {
                    /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view.
                     */
                    // 判斷發(fā)送選票的服務(wù)器的狀態(tài)
                    switch (n.state) {
                    case LOOKING: // 處于尋找leader狀態(tài)
                        // If notification > current, replace and send messages out
                        // 發(fā)出選票的服務(wù)器epoch大于本服務(wù)器的邏輯時(shí)鐘
                        if (n.electionEpoch > logicalclock.get()) {
                            // 改變邏輯時(shí)鐘的值
                            logicalclock.set(n.electionEpoch);
                            // TODO 清空票箱
                            recvset.clear();
                            // 進(jìn)行選票pk
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            // 將獲勝的選票進(jìn)行廣播
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) { // 發(fā)出選票的服務(wù)器epoch小于本服務(wù)器的邏輯時(shí)鐘
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break; // 直接跳出循環(huán)了?础芍?杈抢? 這里是結(jié)束switch循環(huán),重新獲取一張選票
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, // 發(fā)出選票的服務(wù)器epoch等于本服務(wù)器的邏輯時(shí)鐘,進(jìn)行pk,如果獲勝則更新選票并廣播
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }
                        // 將選票放入票箱
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        // 判斷選舉是否結(jié)束
                        // 票箱和當(dāng)前的leader選票進(jìn)行比較,看是否超過半數(shù)
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            // 將選票接收隊(duì)列中所有剩下的選票與選出的leader比較,如果獲勝仑性,則放入票箱惶楼,跳出while循環(huán)
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            // 選票接收隊(duì)列中已經(jīng)沒有選票了
                            if (n == null) {
                                // 最后勝出的選票是自己,更新狀態(tài)為leading,否則為following
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    // 發(fā)送選票的服務(wù)器的狀態(tài)是Following和Leading
                    // 這種情況是某臺(tái)服務(wù)器重啟之后,已經(jīng)選舉出新Leader了
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        // 如果是同一輪投票
                        if(n.electionEpoch == logicalclock.get()){ //是否可以加入已有的集群
                            // 將選票放入到票箱中
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        // 這是一條與當(dāng)前邏輯時(shí)鐘不符合的消息诊杆,那么說明在另一個(gè)選舉過程中已經(jīng)有了選舉結(jié)果歼捐,
                        // 于是將該選舉結(jié)果加入到outofelection集合中,再根據(jù)outofelection來判斷是否可以結(jié)束選舉,
                        // 如果可以也是保存邏輯時(shí)鐘晨汹,設(shè)置選舉狀態(tài)豹储,退出選舉過程。
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }
}

流程圖:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末淘这,一起剝皮案震驚了整個(gè)濱河市剥扣,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌铝穷,老刑警劉巖钠怯,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異曙聂,居然都是意外死亡晦炊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門宁脊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來断国,“玉大人,你說我怎么就攤上這事朦佩〔⑺迹” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵语稠,是天一觀的道長(zhǎng)宋彼。 經(jīng)常有香客問我,道長(zhǎng)仙畦,這世上最難降的妖魔是什么输涕? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮慨畸,結(jié)果婚禮上莱坎,老公的妹妹穿的比我還像新娘。我一直安慰自己寸士,他們只是感情好檐什,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布碴卧。 她就那樣靜靜地躺著,像睡著了一般乃正。 火紅的嫁衣襯著肌膚如雪住册。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天瓮具,我揣著相機(jī)與錄音荧飞,去河邊找鬼。 笑死名党,一個(gè)胖子當(dāng)著我的面吹牛叹阔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播传睹,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼耳幢,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了蒋歌?” 一聲冷哼從身側(cè)響起帅掘,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎堂油,沒想到半個(gè)月后修档,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡府框,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年吱窝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迫靖。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡院峡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出系宜,到底是詐尸還是另有隱情照激,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布盹牧,位于F島的核電站俩垃,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏汰寓。R本人自食惡果不足惜口柳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望有滑。 院中可真熱鬧跃闹,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至找默,卻和暖如春想帅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背啡莉。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留旨剥,地道東北人咧欣。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓瓮孙,卻偏偏與公主長(zhǎng)得像栏渺,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子骂维,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 【轉(zhuǎn)自】http://www.cnblogs.com/leesf456/p/6107600.html 一蚌父、前言 前...
    lxqfirst閱讀 835評(píng)論 0 0
  • 一哮兰、前言 前面學(xué)習(xí)了Zookeeper服務(wù)端的相關(guān)細(xì)節(jié),其中對(duì)于集群?jiǎn)?dòng)而言苟弛,很重要的一部分就是Leader選舉喝滞,...
    阿斯蒂芬2閱讀 17,626評(píng)論 4 19
  • zookeeper集群中往往需要在集群服務(wù)器中選舉出一個(gè)Leader,Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵...
    探索者_(dá)逗你玩兒閱讀 441評(píng)論 0 0
  • Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在膏秫。當(dāng)Zookeeper集群中的一臺(tái)服務(wù)器出現(xiàn)以下兩種情況之一時(shí)右遭,需...
    tracy_668閱讀 1,276評(píng)論 1 11
  • 若進(jìn)行Leader選舉,則至少需要兩臺(tái)機(jī)器缤削,這里選取3臺(tái)機(jī)器組成的服務(wù)器集群為例窘哈。在集群初始化階段,當(dāng)有一臺(tái)服務(wù)器...
    白紙糊閱讀 973評(píng)論 0 1