Zookeeper之Leader選舉

????Zookeeper是采用的zab協(xié)議進(jìn)行實現(xiàn)的,而不是完全Paxos實現(xiàn)的箫措。在主備系統(tǒng)架構(gòu)模式下腹备,采用zab協(xié)議來保證集群中所有副本的數(shù)據(jù)一致性。主系統(tǒng)接受處理所有的事務(wù)性請求斤蔓,然后將數(shù)據(jù)變更狀態(tài)以proposal提案的形式同步給所有的副本進(jìn)程植酥。所以在這個過程中,Leader機(jī)器顯得格外重要弦牡。

????Leader選舉就是在集群中選舉出一個主進(jìn)程友驮,用來接收處理所有客戶端的事務(wù)性請求。有個隱式條件就是集群中的服務(wù)器大于等于2臺才能開始Leader選舉驾锰。

Leader選舉時機(jī):
  • 服務(wù)器啟動時期的Leader選舉
  • 服務(wù)器運行期間的Leader選舉
選舉流程:
  • 發(fā)送當(dāng)前自己機(jī)器的選票信息給集群中的其它機(jī)器
  • 接收集群中其它機(jī)器發(fā)送過來的選票信息
  • 處理接收到的投票信息
  • 統(tǒng)計投票信息
  • 改變當(dāng)前服務(wù)器的狀態(tài)
選票PK規(guī)則:
  • 首先比對zxId
  • 再比對sid
Zookeeper中l(wèi)eader選舉的實現(xiàn)

????zookeeper中的leader選舉由FastLeaderElection具體實現(xiàn)卸留。其中有幾個重要的類:

  • Notification:代表收到的投票信息類
  • ToSend:發(fā)送給其它服務(wù)器的投票信息
  • WorkerReceiver和WorkerSender以及Messager
protected class Messenger {
        // 選票發(fā)送器
        WorkerSender ws;
        // 選票接收器
        WorkerReceiver wr;
    }
  • recvqueue:收票隊列
  • sendqueue:發(fā)送選票隊列

????WorkerReceiver和WorkerSender不停地從QuorumCnxManager中獲取收到的選票信息,以及向集群中所有其它looking機(jī)器發(fā)送選票信息椭豫。

FastLeaderElection繼承自Election艾猜,實現(xiàn)了其中的選舉leader的方法

public Vote lookForLeader() throws InterruptedException {

            try {
                //TODO 所有收到的選票集合
                Map<Long, Vote> recvset = new HashMap<Long, Vote>();

                synchronized (this) {
                    //TODO 邏輯時鐘++
                    logicalclock.incrementAndGet();
                    //TODO 更新選票 推選的leaderId、zxId 和 選舉周期
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                }

                //TODO 給集群中的其它服務(wù)器發(fā)送當(dāng)前服務(wù)器的投票信息
                sendNotifications();

                SyncedLearnerTracker voteSet;

                //當(dāng)前服務(wù)器是選舉狀態(tài)
                while ((self.getPeerState() == QuorumPeer.ServerState.LOOKING) && (!stop)) {
                    //TODO 從QuorumCnxManager中獲取收到的外部 投票信息
                    FastLeaderElection.Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                    //TODO 收到的外部投票信息為空
                    if (n == null) {
                        //TODO 當(dāng)前服務(wù)器選票信息是否發(fā)送完
                        if (manager.haveDelivered()) {
                            //TODO 發(fā)送完了繼續(xù)發(fā)送當(dāng)前服務(wù)器的選票信息給集群中的其它服務(wù)器
                            sendNotifications();
                        } else {
                            //TODO 沒有發(fā)送完就當(dāng)前服務(wù)器建立和其它服務(wù)器的鏈接信息
                            manager.connectAll();
                        }
                    } else if (validVoter(n.sid) && validVoter(n.leader)) {
                        //TODO 收到的選票中捻悯,投票者和被推選者都是 屬于投票集合中

                        //TODO 查看收到的選票的狀態(tài)
                        switch (n.state) {
                            case LOOKING:
                                if (getInitLastLoggedZxid() == -1) {
                                    LOG.debug("Ignoring notification as our zxid is -1");
                                    break;
                                }
                                if (n.zxid == -1) {
                                    LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                                    break;
                                }
                                // If notification > current, replace and send messages out
                                //TODO 如果收到的投票選舉周期大于當(dāng)前機(jī)器的時鐘周期
                                if (n.electionEpoch > logicalclock.get()) {
                                    //TODO 更新當(dāng)前機(jī)器的時鐘周期
                                    logicalclock.set(n.electionEpoch);
                                    //TODO 清空所有收到的投票信息
                                    recvset.clear();
                                    //TODO 如果收到的選票信息優(yōu)于當(dāng)前服務(wù)器選票信息匆赃,變更當(dāng)前服務(wù)器的投票信息
                                    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    } else {
                                        //TODO 更新選票為自己當(dāng)前服務(wù)器的選票信息
                                        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                    }
                                    //TODO 發(fā)送選票信息
                                    sendNotifications();
                                } else if (n.electionEpoch < logicalclock.get()) {
                                    //TODO 如果邏輯時鐘小于當(dāng)前邏輯時鐘,忽略
                                    LOG.debug(
                                            "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                            Long.toHexString(n.electionEpoch),
                                            Long.toHexString(logicalclock.get()));
                                    break;
                                } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    //TODO 選舉周期相同今缚,但是收到的選票更優(yōu)算柳,更新選票信息
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    //TODO 發(fā)送選票信息
                                    sendNotifications();
                                }

                                //TODO 收到的選票信息放入集合中
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                                //TODO 投票歸檔,查看是否已leader選舉完成
                                voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                                if (voteSet.hasAllQuorums()) {

                                    while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                            recvqueue.put(n);
                                            break;
                                        }
                                    }

                                    if (n == null) {
                                        setPeerState(proposedLeader, voteSet);
                                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                        leaveInstance(endVote);
                                        return endVote;
                                    }
                                }
                                break;
                            case OBSERVING:
                                LOG.debug("Notification from observer: {}", n.sid);
                                break;
                            case FOLLOWING:
                            case LEADING:

                                if (n.electionEpoch == logicalclock.get()) {
                                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                    voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                    if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                        setPeerState(n.leader, voteSet);
                                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                        leaveInstance(endVote);
                                        return endVote;
                                    }
                                }

                                outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                                if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                    synchronized (this) {
                                        logicalclock.set(n.electionEpoch);
                                        setPeerState(n.leader, voteSet);
                                    }
                                    Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                                break;
                            default:
                                LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                                break;
                        }
                    } else {
                        if (!validVoter(n.leader)) {
                            LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                        }
                        if (!validVoter(n.sid)) {
                            LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                        }
                    }
                }
                return null;
            } finally {
                try {
                    if (self.jmxLeaderElectionBean != null) {
                        MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                self.jmxLeaderElectionBean = null;
                LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
            }
        }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末姓言,一起剝皮案震驚了整個濱河市瞬项,隨后出現(xiàn)的幾起案子蔗蹋,更是在濱河造成了極大的恐慌,老刑警劉巖囱淋,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件猪杭,死亡現(xiàn)場離奇詭異,居然都是意外死亡妥衣,警方通過查閱死者的電腦和手機(jī)皂吮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來税手,“玉大人蜂筹,你說我怎么就攤上這事÷梗” “怎么了艺挪?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長兵扬。 經(jīng)常有香客問我麻裳,道長,這世上最難降的妖魔是什么器钟? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任掂器,我火速辦了婚禮,結(jié)果婚禮上俱箱,老公的妹妹穿的比我還像新娘。我一直安慰自己灭必,他們只是感情好狞谱,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著禁漓,像睡著了一般跟衅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上播歼,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天伶跷,我揣著相機(jī)與錄音,去河邊找鬼秘狞。 笑死叭莫,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烁试。 我是一名探鬼主播雇初,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼减响!你這毒婦竟也來了靖诗?” 一聲冷哼從身側(cè)響起郭怪,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎刊橘,沒想到半個月后鄙才,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡促绵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年攒庵,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绞愚。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡叙甸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出位衩,到底是詐尸還是另有隱情裆蒸,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布糖驴,位于F島的核電站僚祷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏贮缕。R本人自食惡果不足惜辙谜,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望感昼。 院中可真熱鬧装哆,春花似錦、人聲如沸定嗓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宵溅。三九已至凌简,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間恃逻,已是汗流浹背雏搂。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留寇损,地道東北人凸郑。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像矛市,于是被迫代替她去往敵國和親线椰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當(dāng)Zookeeper集群中的一臺服務(wù)器出現(xiàn)以下兩種情況之一時憨愉,需...
    tracy_668閱讀 1,276評論 1 11
  • 【轉(zhuǎn)自】http://www.cnblogs.com/leesf456/p/6107600.html 一烦绳、前言 前...
    lxqfirst閱讀 835評論 0 0
  • 一径密、Leader選舉過程 Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當(dāng)Zookeeper集群中的一臺服務(wù)器...
    yannhuang閱讀 1,226評論 0 2
  • 一躺孝、前言 前面學(xué)習(xí)了Zookeeper服務(wù)端的相關(guān)細(xì)節(jié)享扔,其中對于集群啟動而言,很重要的一部分就是Leader選舉植袍,...
    數(shù)據(jù)萌新閱讀 1,274評論 0 0
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)惧眠、焦點、注意力于个、語言聯(lián)想氛魁、情景聯(lián)想 觀點: 1.統(tǒng)計學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會...
    Jenaral閱讀 5,721評論 0 5