關(guān)于Elasticsearch的選舉機(jī)制:
ES選舉master機(jī)制不像Hbase的HMaster選舉, HMaster選舉是借助ZK,通過各個(gè)節(jié)點(diǎn)向ZK注冊(cè)臨時(shí)節(jié)點(diǎn)(ZK保證只有一個(gè)節(jié)點(diǎn)能夠注冊(cè)成功旅挤, 此節(jié)點(diǎn)就是master節(jié)點(diǎn))礼华,其余節(jié)點(diǎn)加入備節(jié)點(diǎn),而且會(huì)監(jiān)測ZNODE是否消失戏售,消失的時(shí)候侨核,備節(jié)點(diǎn)會(huì)爭相向ZK注冊(cè)臨時(shí)節(jié)點(diǎn)進(jìn)而選出新的master草穆。
Elasticsearch選舉master的時(shí)候, 當(dāng)加入一個(gè)節(jié)點(diǎn)搓译, 如果之前的Elasticsearch集群已經(jīng)正常的在運(yùn)行悲柱, 那么此時(shí)這個(gè)節(jié)點(diǎn)的加入會(huì)選擇接受之前的master, 然后自己連接master并加入這個(gè)master構(gòu)成的集群些己。如果是整個(gè)master集群剛開始初始啟動(dòng)的時(shí)候豌鸡,這時(shí)候情況就會(huì)不同,就會(huì)出現(xiàn)選舉master的過程段标。 這時(shí)候的選舉可能選到了自己作為master涯冠, 也有可能是接受其他節(jié)點(diǎn)的master。
代碼流程圖如下所以:
其代碼主要是ZenDiscovery這個(gè)類逼庞,在它的doStart方法中功偿,
protected void doStart() {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
joinThreadControl.startNewThreadIfNotRunning();
return currentState;
}
@Override
public void onFailure(String source, @org.elasticsearch.common.Nullable Throwable t) {
logger.warn("failed to start initial join process", t);
}
});
}
public void startNewThreadIfNotRunning() {
assertClusterStateThread();
if (joinThreadActive()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
最終會(huì)調(diào)用 innerJoinCluster();函數(shù)
innerJoinCluster函數(shù)中,最主要的一部分代碼就是
// 一直阻塞直到找到master節(jié)點(diǎn)往堡,在集群剛剛啟動(dòng)械荷,或者集群master丟失的情況,這種阻塞能夠保證集群一致性
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster(); // 找到Master虑灰, 可能是自己也可能不是自己
}
findMaster的代碼邏輯是:
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
// 通過ping 其他節(jié)點(diǎn)來判定本節(jié)點(diǎn)能夠連接上的節(jié)點(diǎn)的個(gè)數(shù)
ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("full ping responses:");
if (fullPingResponses.length == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace(sb.toString());
}
// filter responses
// 過濾PingResponse, 排除掉client節(jié)點(diǎn)吨瞎,單純的data節(jié)點(diǎn)
List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
DiscoveryNode node = pingResponse.node();
if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
// filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
} else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
// filter out data node that is not also master
} else {
pingResponses.add(pingResponse);
}
}
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("filtered ping responses: (filter_client[").append(masterElectionFilterClientNodes).append("], filter_data[").append(masterElectionFilterDataNodes).append("])");
if (pingResponses.isEmpty()) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.debug(sb.toString());
}
final DiscoveryNode localNode = clusterService.localNode();
List<DiscoveryNode> pingMasters = new ArrayList<>();
//獲取所有ping響應(yīng)中的master節(jié)點(diǎn),如果master節(jié)點(diǎn)是節(jié)點(diǎn)本身則過濾掉穆咐。
// 要么是同一個(gè)節(jié)點(diǎn)(出現(xiàn)不同節(jié)點(diǎn)則集群出現(xiàn)了問題不過沒關(guān)系颤诀,后面會(huì)進(jìn)行選舉)
// 正常情況下, pingMasters只有一個(gè)值
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (!localNode.equals(pingResponse.master())) {
pingMasters.add(pingResponse.master());
}
}
}
// nodes discovered during pinging
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
//本節(jié)點(diǎn)暫時(shí)是master也要加入候選節(jié)點(diǎn)進(jìn)行選舉
if (localNode.masterNode()) { // 本節(jié)點(diǎn)被人選舉為master
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}
//pingMasters為空对湃,則本節(jié)點(diǎn)是master節(jié)點(diǎn)崖叫,
if (pingMasters.isEmpty()) { // pingMasters時(shí)空有兩種情況,一種本地節(jié)點(diǎn)就是master節(jié)點(diǎn)
// 保證選舉數(shù)量,說明有足夠多的節(jié)點(diǎn)選舉本節(jié)點(diǎn)為master拍柒,但是這還不夠心傀,本節(jié)點(diǎn)還需要再選舉一次,如果本次選舉節(jié)點(diǎn)仍舊是自己拆讯,那么本節(jié)點(diǎn)才能成為master脂男。
if (electMaster.hasEnoughMasterNodes(activeNodes)) { // 判斷是否包含足夠的節(jié)點(diǎn)數(shù),是否大于n/2 + 1
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本節(jié)點(diǎn)已經(jīng)被ping同的節(jié)點(diǎn)選為了master种呐, 也要自己選擇自己一把宰翅,才能成為master, 不然的話對(duì)activeNodes進(jìn)行重新選舉
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes); // 重新選舉
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
} else {
//pingMasters不為空(pingMasters列表中應(yīng)該都是同一個(gè)節(jié)點(diǎn)),本節(jié)點(diǎn)沒有被選舉為master爽室,那就接受之前的選舉汁讼。
assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.electMaster(pingMasters);
}
}
這里我來使用場景來說明一下初始啟動(dòng)集群的場景, 正常集群加入新節(jié)點(diǎn)的過程這里就不再進(jìn)行描述。
假設(shè)有三個(gè)節(jié)點(diǎn)node1, node2, node3, 假設(shè)我們配置每個(gè)節(jié)點(diǎn)都有機(jī)會(huì)(node.master: true)可以成為master,剛開始啟動(dòng)的時(shí)候嘿架, node1啟動(dòng)了卜录,此時(shí)node1去執(zhí)行findMaster(),由于此時(shí)只有一個(gè)節(jié)點(diǎn)眶明, node1只能發(fā)現(xiàn)自己這個(gè)節(jié)點(diǎn)艰毒, 不滿足節(jié)點(diǎn)數(shù)大于n/2+1的條件(配置文件指定的),所以此時(shí)找不到master搜囱, node1會(huì)不斷的執(zhí)行while循環(huán)直到找到master位置丑瞧。
然后此時(shí)node2上線啟動(dòng),node1和node2構(gòu)成了兩個(gè)節(jié)點(diǎn)蜀肘,node2選擇自己作為master節(jié)點(diǎn), 此時(shí)node2 通過ping可以發(fā)現(xiàn)node1, 此時(shí)
if (localNode.masterNode()) { // 本節(jié)點(diǎn)被人選舉為master
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}
可以知道activeNodes里面將會(huì)存放node1,node2,
joinedOnceActiveNodes存放的是node2
然后進(jìn)行到此處:
if (pingMasters.isEmpty()) { // pingMasters時(shí)空有兩種情況绊汹,一種本地節(jié)點(diǎn)就是master節(jié)點(diǎn),另一種一開始初始啟動(dòng)扮宠,還沒選出master西乖,而本節(jié)點(diǎn)也沒被選為master就可能出現(xiàn)空
// 保證選舉數(shù)量,說明有足夠多的節(jié)點(diǎn)選舉本節(jié)點(diǎn)為master,但是這還不夠坛增,本節(jié)點(diǎn)還需要再選舉一次获雕,如果本次選舉節(jié)點(diǎn)仍舊是自己,那么本節(jié)點(diǎn)才能成為master收捣。
if (electMaster.hasEnoughMasterNodes(activeNodes)) { // 判斷是否包含足夠的節(jié)點(diǎn)數(shù)届案,是否大于n/2 + 1
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本節(jié)點(diǎn)已經(jīng)被ping同的節(jié)點(diǎn)選為了master, 也要自己選擇自己一把罢艾,才能成為master, 不然的話對(duì)activeNodes進(jìn)行重新選舉
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes); // 重新選舉
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
}
此時(shí)node2還會(huì)選舉自己一把DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);這樣node2才能真正成為master楣颠, 不然的話會(huì)執(zhí)行return electMaster.electMaster(activeNodes);也就是在node1和node2上重新選舉。
此時(shí)node1 的循環(huán)又開始了咐蚯,他就會(huì)發(fā)現(xiàn)master不是自己而是node2童漩, 這樣就會(huì)接受node2是master
然后第三個(gè)節(jié)點(diǎn)node3上線了, 此時(shí)他也會(huì)執(zhí)行while循環(huán)中的findMaster方法春锋, 發(fā)現(xiàn)集群中已經(jīng)有一個(gè)正常的master矫膨, 這時(shí)候也是接受那個(gè)master, 并與之聯(lián)系看疙,加入集群豆拨。