Discovery模塊負責發(fā)現(xiàn)集群中的節(jié)點哮奇,以及選擇主節(jié)點。ES支持多種不同Discovery類型選擇睛约,內(nèi)置的實現(xiàn)有兩種:Zen Discovery和Coordinator鼎俘,其他的包括公有云平臺亞馬遜的EC2、谷歌的GCE等辩涝。
Zen Discovery和Coordinator
- 1贸伐、Coordinator
ES 7.x 重構(gòu)了一個新的集群協(xié)調(diào)層Coordinator,他實際上是 Raft 的實現(xiàn)怔揩,但并非嚴格按照 Raft 論文實現(xiàn)捉邢,而是做了一些調(diào)整。
可以在配置文件中指定商膊,代碼如下:
org.elasticsearch.discovery.DiscoveryModule.DiscoveryModule(....)
public DiscoveryModule(...) {
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
}
- 2伏伐、Zen Discovery
采用Bully算法
它假定所有節(jié)點都有一個唯一的ID,使用該ID對節(jié)點進行排序晕拆。任何時候的當前Leader都是參與集群的最高ID節(jié)點藐翎。該算法的優(yōu)點是易于實現(xiàn)。但是实幕,當擁有最大ID的節(jié)點處于不穩(wěn)定狀態(tài)的場景下會有問題吝镣。例如,Master負載過重而假死昆庇,集群擁有第二大ID的節(jié)點被選為新主末贾,這時原來的Master恢復,再次被選為新主整吆,然后又假死
ES 通過推遲選舉拱撵,直到當前的 Master 失效來解決上述問題辉川,只要當前主節(jié)點不掛掉,就不重新選主拴测。但是容易產(chǎn)生腦裂(雙主)员串,為此,再通過“法定得票人數(shù)過半”解決腦裂問題
- 3昼扛、算法比較
1寸齐、raft算法與Bully算法
相同點
1、多數(shù)派原則:必須得到超過半數(shù)的選票才能成為master抄谐。
選出的leader一定擁有最新已提交數(shù)據(jù):在raft中渺鹦,數(shù)據(jù)更新的節(jié)點不會給數(shù)據(jù)舊的節(jié)點投選票,而當選需要多數(shù)派的選票蛹含,則當選人一定有最新已提交數(shù)據(jù)毅厚。在es中,version大的節(jié)點排序優(yōu)先級高浦箱,同樣用于保證這一點吸耿。
不同點
正確性論證:raft是一個被論證過正確性的算法,而ES的算法是一個沒有經(jīng)過論證的算法酷窥,只能在實踐中發(fā)現(xiàn)問題咽安,做bug fix,這是我認為最大的不同蓬推。
是否有選舉周期term:raft引入了選舉周期的概念妆棒,每輪選舉term加1,保證了在同一個term下每個參與人只能投1票沸伏。ES在選舉時沒有term的概念糕珊,不能保證每輪每個節(jié)點只投一票。
選舉的傾向性:raft中只要一個節(jié)點擁有最新的已提交的數(shù)據(jù)毅糟,則有機會選舉成為master红选。在ES中,version相同時會按照NodeId排序姆另,總是NodeId小的人優(yōu)先級高喇肋。
2、Paxos算法
Paxos非常強大蜕青,尤其在什么時機苟蹈,以及如何進行選舉方面的靈活性比簡單的Bully算法有很大的優(yōu)勢,因為在現(xiàn)實生活中右核,存在比網(wǎng)絡連接異常更多的故障模式。但 Paxos 實現(xiàn)起來非常復雜
本篇只討論內(nèi)置的Zen Discovery
流程分析
整體流程可以概括為:選舉臨時Master渺绒,如果本節(jié)點當選贺喝,則等待確立Master菱鸥,如果其他節(jié)點當選,則嘗試加入集群躏鱼,然后啟動節(jié)點失效探測器氮采。
1、節(jié)點啟動
如果集群剛啟動則參與選主染苛,否則加入集群
org.elasticsearch.node.Node.start()
public Node start() throws NodeValidationException {
discovery.start();
discovery.startInitialJoin();
}
2鹊漠、選舉臨時Master
選舉過程的實現(xiàn)位于 org.elasticsearch.discovery.zen.ZenDiscovery.findMaster()
,該函數(shù)查找當前集群的活躍 Master,或者從候選者中選擇新的Master茶行。如果選主成功躯概,則返回選定的Master,否則返回空
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
if (activeMasters.isEmpty()) {
//判斷當前候選者人數(shù)是否達到法定人數(shù)
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", masterCandidates);
return null;
}
} else {
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
上面選擇臨時主節(jié)點非常簡單畔师,
首先需要判斷當前候選者人數(shù)是否達到法定人數(shù)娶靡,否則選主失敗。
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}
取列表中的最小值看锉,比較函數(shù)通過compareNodes實現(xiàn),只是對節(jié)點 ID 進行排序
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
3姿锭、確立Master或加入集群
選舉出的臨時Master有兩種情況:該臨時Master是本節(jié)點或非本節(jié)點。
- 1伯铣、如果臨時Master是本節(jié)點:
(1)等待足夠多的具備Master資格的節(jié)點加入本節(jié)點(投票達到法定人數(shù))呻此,以完成選舉。
if (clusterService.localNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.updateNodesAndPing(state); // start the odes FD
}
@Override
public void onFailure(Throwable t) { joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
);
}
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
//是否有足夠的選票
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
}
} else {
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}
(2)超時(默認為30秒腔寡,可配置)后還沒有滿足數(shù)量的join請求趾诗,則選舉失敗,需要進行新一輪選舉蹬蚁。
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
final CountDownLatch done = new CountDownLatch(1);
try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {
}
}
超時后直接return恃泪,當非臨時節(jié)點加入集群不成功時,重新發(fā)起選主流程
org.elasticsearch.discovery.zen.ZenDiscovery.innerJoinCluster()
// send join request
final boolean success = joinElectedMaster(masterNode);
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (!success) {
// failed to join. Try again...
//重新發(fā)起選主流程
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
(3)成功后發(fā)布新的clusterState犀斋。
實現(xiàn)如下:
public synchronized void closeAndBecomeMaster() {
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
submitStateUpdateTask最終通過TaskBatcher# submitTasks來提交任務贝乎。執(zhí)行任務并發(fā)布集群狀態(tài)的總體過程在 MasterService#runTasks 方法中實現(xiàn)。
protected void runTasks(TaskInputs taskInputs) {
publish(clusterChangedEvent, taskOutputs, startTimeNS);
}
- 2叽粹、如果其他節(jié)點被選為Master:
(1)不再接受其他節(jié)點的join請求览效。
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
(2)向Master發(fā)送加入請求,并等待回復虫几。超時時間默認為1分鐘(可配置)锤灿,如果遇到異常,則默認重試3次(可配置)辆脸。這個步驟在joinElectedMaster方法中實現(xiàn)但校。
private boolean joinElectedMaster(DiscoveryNode masterNode) {
try {
int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
while (true) {
try {
membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
return true;
} catch (Exception e) {
final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
if (unwrap instanceof NotMasterException) {
if (++joinAttempt == this.joinRetryAttempts) {
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
return false;
} else {
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
}
} else {
}
return false;
}
}
try {
Thread.sleep(this.joinRetryDelay.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
最終當選的Master會先發(fā)布集群狀態(tài),才確認客戶的join請求啡氢,因此状囱,joinElectedMaster返回代表收到了join請求的確認术裸,并且已經(jīng)收到了集群狀態(tài)。所以如果返回不成功亭枷,則重新發(fā)起選主流程
(3)檢查收到的集群狀態(tài)中的Master節(jié)點如果為空袭艺,或者當選的Master不是之前選擇的節(jié)點,則重新選舉叨粘。
if (currentState.getNodes().getMasterNode() == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return currentState;
總結(jié)
1猾编、es通過主從模式以及發(fā)現(xiàn)機制保證節(jié)點之間的負載均衡,但是es使用量的急劇增加暴露了很多問題升敲,例如答倡,Zen的minimum_master_nodes設置經(jīng)常配置錯誤,這會使群集更容易出現(xiàn)裂腦和丟失數(shù)據(jù)的風險
2冻晤、7.x以上版本Coordinator提供了安全的亞秒級的master選舉時間苇羡,而Zen可能要花幾秒鐘來選擇一個新的master
3、es的master掛了鼻弧,數(shù)據(jù)節(jié)點在這區(qū)間還能對外提供服務嗎设江?
參考
Elasticsearch分布式一致性原理剖析