請注意戚炫,此篇文章并不是介紹Zookeeper集群內部Leader的選舉機制猜旬,而是應用程序使用Zookeeper作為選舉桩砰。
使用Zookeeper進行選舉阶捆,主要用到了Znode的兩個性質:
- 臨時節(jié)點(EPHEMERAL)
- 序列化節(jié)點(SEQUENCE)
每一個臨時的序列化節(jié)點代表著一個客戶端(client)凌节,也就是選民钦听。主要的設計思路如下:
首先,創(chuàng)建一個選舉的節(jié)點倍奢,我們叫做/election朴上。
然后,每有一個客戶端加入卒煞,就創(chuàng)建一個子節(jié)點/election/n_xxx痪宰,這個節(jié)點是EPHEMERAL并且SEQUENCE,xxx就是序列化產生的單調遞增的數字畔裕。
在所有子節(jié)點中衣撬,序列數字做小的被選舉成Leader。
上面的并不是重點扮饶,重點是Leader失敗的檢測具练,Leader失敗后,一個新的客戶端(client)將被選舉成Leader甜无。實現這個過程的一個最簡單的方式是
所有的客戶端(client)都監(jiān)聽Leader節(jié)點扛点,一旦Leader節(jié)點消失,將通知所有的客戶端(client)執(zhí)行Leader選舉過程岂丘,序列數字最小的將被選舉成Leader陵究。
這樣實現看似沒有問題,但是當客戶端(client)數量非常龐大時元潘,所有客戶端(client)都將在/election節(jié)點執(zhí)行getChildren()畔乙,這對Zookeeper
的壓力是非常大的。為了避免這種“驚群效應”翩概,我們可以讓客戶端只監(jiān)聽它前一個節(jié)點(所有序列數字比當前節(jié)點小牲距,并且是其中最大的那個節(jié)點)。
這樣钥庇,Leader節(jié)點消失后牍鞠,哪個節(jié)點收到了通知,哪個節(jié)點就變成Leader评姨,因為所有節(jié)點中难述,沒有比它序列更小的節(jié)點了。
具體步驟如下:
- 使用EPHEMERAL和SEQUENCE創(chuàng)建節(jié)點/election/n_xxx吐句,我們叫做z胁后。
- C為/election的子節(jié)點集合,i是z的序列數字嗦枢。
- 監(jiān)聽/election/n_j攀芯,j是C中小于i的最大數字。
接收到節(jié)點消失的事件后:
- C為新的/election的子節(jié)點集合
- 如果z是集合中最小的節(jié)點文虏,則z被選舉成Leader
- 如果z不是最小節(jié)點侣诺,則繼續(xù)監(jiān)聽/election/n_j殖演,j是C中小于i的最大數字。
具體代碼如下:
public class Candidate implements Runnable, Watcher {
//zk
private ZooKeeper zk;
//臨時節(jié)點前綴
private String perfix = "n_";
//當前節(jié)點
private String currentNode;
//前一個最大節(jié)點
private String lastNode;
/**
* 構造函數
* @param address zk地址
*/
public Candidate(String address) {
try {
this.zk = new ZooKeeper(address, 3000, this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 加入選舉
*/
@Override
public void run() {
try {
//創(chuàng)建臨時節(jié)點
currentNode = zk.create("/zookeeper/election/" + perfix, Thread.currentThread().getName().getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//選舉
election();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 從小到大排序臨時節(jié)點
* @param children
* @return
*/
private List<String> getSortedNode(List<String> children) {
return children.stream().sorted(((o1, o2) -> {
String sequence1 = o1.split(perfix)[1];
String sequence2 = o2.split(perfix)[1];
BigDecimal decimal1 = new BigDecimal(sequence1);
BigDecimal decimal2 = new BigDecimal(sequence2);
int result = decimal1.compareTo(decimal2);
return result;
})).collect(toList());
}
/**
* 選舉過程
*/
private void election(){
try{
while (true){
//獲取/election節(jié)點中的所有子節(jié)點
List<String> children = zk.getChildren("/zookeeper/election", false);
//所有子節(jié)點排序(從小到大)
List<String> sortedNodes = getSortedNode(children);
//獲取最小節(jié)點
String smallestNode = sortedNodes.get(0);
//當前節(jié)點就是最小節(jié)點年鸳,被選舉成Leader
if (currentNode.equals("/zookeeper/election/"+smallestNode)) {
System.out.println(currentNode + "被選舉成Leader趴久。");
Thread.sleep(5000);
//模擬Leader節(jié)點死去
System.out.println(currentNode+"已離去");
zk.close();
break;
}
//當前節(jié)點不是最小節(jié)點,監(jiān)聽前一個最大節(jié)點
else {
//前一個最大節(jié)點
lastNode = smallestNode;
//找到前一個最大節(jié)點搔确,并監(jiān)聽
for (int i = 1; i < sortedNodes.size(); i++) {
String z = sortedNodes.get(i);
//找到前一個最大節(jié)點彼棍,并監(jiān)聽
if (currentNode.equals("/zookeeper/election/"+z)) {
zk.exists("/zookeeper/election/" + lastNode, true);
System.out.println(currentNode+"監(jiān)聽"+lastNode);
//等待被喚起執(zhí)行Leader選舉
synchronized (this){
wait();
}
break;
}
lastNode = z;
}
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 觀察器通知
* @param event
*/
@Override
public void process(WatchedEvent event) {
//監(jiān)聽節(jié)點刪除事件
if (event.getType().equals(Event.EventType.NodeDeleted)) {
//被刪除的節(jié)點是前一個最大節(jié)點,喚起線程執(zhí)行選舉
if (event.getPath().equals("/zookeeper/election/" + lastNode)) {
System.out.println(currentNode+"被喚起");
synchronized (this){
notify();
}
}
}
}
}
我們將啟動5個線程作為參選者妥箕,模擬每一個Leader死去滥酥,并重新選舉的過程。啟動程序如下:
public class Application {
private static final String ADDRESS = "149.28.37.147:2181";
public static void main(String[] args) throws InterruptedException {
setLog();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i=0;i<5;i++){
es.execute(new Candidate(ADDRESS));
}
es.shutdown();
}
/**
* 設置log級別為Error
*/
public static void setLog(){
//1.logback
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
//獲取應用中的所有l(wèi)ogger實例
List<Logger> loggerList = loggerContext.getLoggerList();
//遍歷更改每個logger實例的級別,可以通過http請求傳遞參數進行動態(tài)配置
for (ch.qos.logback.classic.Logger logger:loggerList){
logger.setLevel(Level.toLevel("ERROR"));
}
}
}
運行結果如下:
/zookeeper/election/n_0000000133被選舉成Leader畦幢。
/zookeeper/election/n_0000000134監(jiān)聽n_0000000133
/zookeeper/election/n_0000000137監(jiān)聽n_0000000136
/zookeeper/election/n_0000000135監(jiān)聽n_0000000134
/zookeeper/election/n_0000000136監(jiān)聽n_0000000135
/zookeeper/election/n_0000000133已離去
/zookeeper/election/n_0000000134被喚起
/zookeeper/election/n_0000000134被選舉成Leader坎吻。
/zookeeper/election/n_0000000134已離去
/zookeeper/election/n_0000000135被喚起
/zookeeper/election/n_0000000135被選舉成Leader。
/zookeeper/election/n_0000000135已離去
/zookeeper/election/n_0000000136被喚起
/zookeeper/election/n_0000000136被選舉成Leader宇葱。
/zookeeper/election/n_0000000136已離去
/zookeeper/election/n_0000000137被喚起
/zookeeper/election/n_0000000137被選舉成Leader瘦真。
/zookeeper/election/n_0000000137已離去
Zookeeper作為選舉的應用就介紹完了,項目示例請參考:https://github.com/liubo-tech/zookeeper-application黍瞧。