es源碼筆記-7.x 選主流程

Discovery模塊負責發(fā)現(xiàn)集群中的節(jié)點哮奇,以及選擇主節(jié)點。ES支持多種不同Discovery類型選擇睛约,內(nèi)置的實現(xiàn)有兩種:Zen Discovery和Coordinator鼎俘,其他的包括公有云平臺亞馬遜的EC2、谷歌的GCE等辩涝。

Zen Discovery和Coordinator

  • 1贸伐、Coordinator
    ES 7.x 重構(gòu)了一個新的集群協(xié)調(diào)層Coordinator,他實際上是 Raft 的實現(xiàn)怔揩,但并非嚴格按照 Raft 論文實現(xiàn)捉邢,而是做了一些調(diào)整。
    可以在配置文件中指定商膊,代碼如下:
    org.elasticsearch.discovery.DiscoveryModule.DiscoveryModule(....)
public DiscoveryModule(...) {
        if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
            discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
                settings, clusterSettings,
                transportService, namedWriteableRegistry, allocationService, masterService,
                () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
        } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
            discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
                clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
        } else {
            throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
        }
}

  • 2伏伐、Zen Discovery
    采用Bully算法

它假定所有節(jié)點都有一個唯一的ID,使用該ID對節(jié)點進行排序晕拆。任何時候的當前Leader都是參與集群的最高ID節(jié)點藐翎。該算法的優(yōu)點是易于實現(xiàn)。但是实幕,當擁有最大ID的節(jié)點處于不穩(wěn)定狀態(tài)的場景下會有問題吝镣。例如,Master負載過重而假死昆庇,集群擁有第二大ID的節(jié)點被選為新主末贾,這時原來的Master恢復,再次被選為新主整吆,然后又假死
ES 通過推遲選舉拱撵,直到當前的 Master 失效來解決上述問題辉川,只要當前主節(jié)點不掛掉,就不重新選主拴测。但是容易產(chǎn)生腦裂(雙主)员串,為此,再通過“法定得票人數(shù)過半”解決腦裂問題

  • 3昼扛、算法比較
    1寸齐、raft算法與Bully算法
相同點

1、多數(shù)派原則:必須得到超過半數(shù)的選票才能成為master抄谐。
選出的leader一定擁有最新已提交數(shù)據(jù):在raft中渺鹦,數(shù)據(jù)更新的節(jié)點不會給數(shù)據(jù)舊的節(jié)點投選票,而當選需要多數(shù)派的選票蛹含,則當選人一定有最新已提交數(shù)據(jù)毅厚。在es中,version大的節(jié)點排序優(yōu)先級高浦箱,同樣用于保證這一點吸耿。

不同點

正確性論證:raft是一個被論證過正確性的算法,而ES的算法是一個沒有經(jīng)過論證的算法酷窥,只能在實踐中發(fā)現(xiàn)問題咽安,做bug fix,這是我認為最大的不同蓬推。

是否有選舉周期term:raft引入了選舉周期的概念妆棒,每輪選舉term加1,保證了在同一個term下每個參與人只能投1票沸伏。ES在選舉時沒有term的概念糕珊,不能保證每輪每個節(jié)點只投一票。
選舉的傾向性:raft中只要一個節(jié)點擁有最新的已提交的數(shù)據(jù)毅糟,則有機會選舉成為master红选。在ES中,version相同時會按照NodeId排序姆另,總是NodeId小的人優(yōu)先級高喇肋。

2、Paxos算法
Paxos非常強大蜕青,尤其在什么時機苟蹈,以及如何進行選舉方面的靈活性比簡單的Bully算法有很大的優(yōu)勢,因為在現(xiàn)實生活中右核,存在比網(wǎng)絡連接異常更多的故障模式。但 Paxos 實現(xiàn)起來非常復雜

本篇只討論內(nèi)置的Zen Discovery

流程分析

整體流程可以概括為:選舉臨時Master渺绒,如果本節(jié)點當選贺喝,則等待確立Master菱鸥,如果其他節(jié)點當選,則嘗試加入集群躏鱼,然后啟動節(jié)點失效探測器氮采。

1、節(jié)點啟動

如果集群剛啟動則參與選主染苛,否則加入集群
org.elasticsearch.node.Node.start()

public Node start() throws NodeValidationException {
    discovery.start();           
    discovery.startInitialJoin();
}
2鹊漠、選舉臨時Master

選舉過程的實現(xiàn)位于 org.elasticsearch.discovery.zen.ZenDiscovery.findMaster(),該函數(shù)查找當前集群的活躍 Master,或者從候選者中選擇新的Master茶行。如果選主成功躯概,則返回選定的Master,否則返回空

private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();             
        List<DiscoveryNode> activeMasters = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        if (activeMasters.isEmpty()) {
            //判斷當前候選者人數(shù)是否達到法定人數(shù)
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.trace("not enough master nodes [{}]", masterCandidates);
                return null;
            }
        } else {
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

上面選擇臨時主節(jié)點非常簡單畔师,
首先需要判斷當前候選者人數(shù)是否達到法定人數(shù)娶靡,否則選主失敗。

  public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        if (candidates.isEmpty()) {
            return false;
        }
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        return candidates.size() >= minimumMasterNodes;
    }

取列表中的最小值看锉,比較函數(shù)通過compareNodes實現(xiàn),只是對節(jié)點 ID 進行排序

   public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }

 public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
            // list, so if c2 has a higher cluster state version, it needs to come first.
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            if (ret == 0) {
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }
3姿锭、確立Master或加入集群

選舉出的臨時Master有兩種情況:該臨時Master是本節(jié)點或非本節(jié)點。

  • 1伯铣、如果臨時Master是本節(jié)點:
    (1)等待足夠多的具備Master資格的節(jié)點加入本節(jié)點(投票達到法定人數(shù))呻此,以完成選舉。
if (clusterService.localNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one           
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            joinThreadControl.markThreadAsDone(currentThread);                            
                            nodesFD.updateNodesAndPing(state); // start the odes FD
                        }
                        @Override
                        public void onFailure(Throwable t) {                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        }
                    }

            );
        }

    private synchronized void checkPendingJoinsAndElectIfNeeded() {
        assert electionContext != null : "election check requested but no active context";
        final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
        //是否有足夠的選票
        if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {           
            }
        } else {           
            electionContext.closeAndBecomeMaster();
            electionContext = null; // clear this out so future joins won't be accumulated
        }
    }

(2)超時(默認為30秒腔寡,可配置)后還沒有滿足數(shù)量的join請求趾诗,則選舉失敗,需要進行新一輪選舉蹬蚁。

 public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
        final CountDownLatch done = new CountDownLatch(1);       
            try {
                if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
                    // callback handles everything
                    return;
                }
            } catch (InterruptedException e) {
            }          
    }

超時后直接return恃泪,當非臨時節(jié)點加入集群不成功時,重新發(fā)起選主流程
org.elasticsearch.discovery.zen.ZenDiscovery.innerJoinCluster()

// send join request
            final boolean success = joinElectedMaster(masterNode);

            // finalize join through the cluster state update thread
            final DiscoveryNode finalMasterNode = masterNode;
            clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
                @Override
                public boolean runOnlyOnMaster() {
                    return false;
                }
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    if (!success) {
                        // failed to join. Try again...
              //重新發(fā)起選主流程
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }

(3)成功后發(fā)布新的clusterState犀斋。
實現(xiàn)如下:

 public synchronized void closeAndBecomeMaster() {           
            Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
            final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

            tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
            tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);

            clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
        }

submitStateUpdateTask最終通過TaskBatcher# submitTasks來提交任務贝乎。執(zhí)行任務并發(fā)布集群狀態(tài)的總體過程在 MasterService#runTasks 方法中實現(xiàn)。

protected void runTasks(TaskInputs taskInputs) {       
                publish(clusterChangedEvent, taskOutputs, startTimeNS);         
    }
  • 2叽粹、如果其他節(jié)點被選為Master:
    (1)不再接受其他節(jié)點的join請求览效。
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");

(2)向Master發(fā)送加入請求,并等待回復虫几。超時時間默認為1分鐘(可配置)锤灿,如果遇到異常,則默認重試3次(可配置)辆脸。這個步驟在joinElectedMaster方法中實現(xiàn)但校。

    private boolean joinElectedMaster(DiscoveryNode masterNode) {
        try {
        
        int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
        while (true) {
            try {             
                membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
                return true;
            } catch (Exception e) {
                final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
                if (unwrap instanceof NotMasterException) {
                    if (++joinAttempt == this.joinRetryAttempts) {
                        logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                        return false;
                    } else {
                        logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                    }
                } else {                   
                    }
                    return false;
                }
            }

            try {
                Thread.sleep(this.joinRetryDelay.millis());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

最終當選的Master會先發(fā)布集群狀態(tài),才確認客戶的join請求啡氢,因此状囱,joinElectedMaster返回代表收到了join請求的確認术裸,并且已經(jīng)收到了集群狀態(tài)。所以如果返回不成功亭枷,則重新發(fā)起選主流程
(3)檢查收到的集群狀態(tài)中的Master節(jié)點如果為空袭艺,或者當選的Master不是之前選擇的節(jié)點,則重新選舉叨粘。

       if (currentState.getNodes().getMasterNode() == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }

                    if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
                        return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                    }

                    // Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
                    // when the first cluster state arrives.
                    joinThreadControl.markThreadAsDone(currentThread);
                    return currentState;
總結(jié)

1猾编、es通過主從模式以及發(fā)現(xiàn)機制保證節(jié)點之間的負載均衡,但是es使用量的急劇增加暴露了很多問題升敲,例如答倡,Zen的minimum_master_nodes設置經(jīng)常配置錯誤,這會使群集更容易出現(xiàn)裂腦和丟失數(shù)據(jù)的風險
2冻晤、7.x以上版本Coordinator提供了安全的亞秒級的master選舉時間苇羡,而Zen可能要花幾秒鐘來選擇一個新的master
3、es的master掛了鼻弧,數(shù)據(jù)節(jié)點在這區(qū)間還能對外提供服務嗎设江?
參考
Elasticsearch分布式一致性原理剖析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市攘轩,隨后出現(xiàn)的幾起案子叉存,更是在濱河造成了極大的恐慌,老刑警劉巖度帮,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件歼捏,死亡現(xiàn)場離奇詭異,居然都是意外死亡笨篷,警方通過查閱死者的電腦和手機瞳秽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來率翅,“玉大人练俐,你說我怎么就攤上這事∶岢簦” “怎么了腺晾?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長辜贵。 經(jīng)常有香客問我悯蝉,道長,這世上最難降的妖魔是什么托慨? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任鼻由,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嗡靡。我一直安慰自己跺撼,他們只是感情好窟感,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布讨彼。 她就那樣靜靜地躺著,像睡著了一般柿祈。 火紅的嫁衣襯著肌膚如雪哈误。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天躏嚎,我揣著相機與錄音蜜自,去河邊找鬼。 笑死卢佣,一個胖子當著我的面吹牛重荠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播虚茶,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼戈鲁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嘹叫?” 一聲冷哼從身側(cè)響起婆殿,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎罩扇,沒想到半個月后婆芦,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡喂饥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年消约,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片员帮。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡或粮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出集侯,到底是詐尸還是另有隱情被啼,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布棠枉,位于F島的核電站浓体,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏辈讶。R本人自食惡果不足惜命浴,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧生闲,春花似錦媳溺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至捉兴,卻和暖如春蝎困,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背倍啥。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工禾乘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人虽缕。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓始藕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親氮趋。 傳聞我的和親對象是個殘疾皇子伍派,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容