轉(zhuǎn):http://throwable.coding.me/2018/12/16/zookeeper-curator-usage
Leader選舉
在分布式計算中赴叹, leader elections是很重要的一個功能躲舌, 這個選舉過程是這樣子的: 指派一個進(jìn)程作為組織者宦搬,將任務(wù)分發(fā)給各節(jié)點。 在任務(wù)開始前刃鳄, 哪個節(jié)點都不知道誰是leader(領(lǐng)導(dǎo)者)或者coordinator(協(xié)調(diào)者). 當(dāng)選舉算法開始執(zhí)行后, 每個節(jié)點最終會得到一個唯一的節(jié)點作為任務(wù)leader. 除此之外, 選舉還經(jīng)常會發(fā)生在leader意外宕機(jī)的情況下罢荡,新的leader要被選舉出來。
在zookeeper集群中对扶,leader負(fù)責(zé)寫操作区赵,然后通過Zab協(xié)議實現(xiàn)follower的同步,leader或者follower都可以處理讀操作浪南。
Curator 有兩種leader選舉的recipe,分別是LeaderSelector和LeaderLatch笼才。
前者是所有存活的客戶端不間斷的輪流做Leader,大同社會络凿。后者是一旦選舉出Leader骡送,除非有客戶端掛掉重新觸發(fā)選舉,否則不會交出領(lǐng)導(dǎo)權(quán)絮记。某黨?
LeaderLatch
LeaderLatch有兩個構(gòu)造函數(shù):
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)
LeaderLatch的啟動:
leaderLatch.start( );
一旦啟動摔踱,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后其中一個最終會被選舉為leader怨愤,可以通過hasLeadership
方法查看LeaderLatch實例是否leader:
leaderLatch.hasLeadership( ); //返回true說明當(dāng)前實例是leader
類似JDK的CountDownLatch派敷, LeaderLatch在請求成為leadership會block(阻塞),一旦不使用LeaderLatch了撰洗,必須調(diào)用close
方法篮愉。 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader差导。
public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException
異常處理: LeaderLatch實例可以增加ConnectionStateListener來監(jiān)聽網(wǎng)絡(luò)連接問題试躏。 當(dāng) SUSPENDED 或 LOST 時, leader不再認(rèn)為自己還是leader。當(dāng)LOST后連接重連后RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新創(chuàng)建一個柿汛。LeaderLatch用戶必須考慮導(dǎo)致leadership丟失的連接問題冗酿。 強(qiáng)烈推薦你使用ConnectionStateListener。
一個LeaderLatch的使用例子:
public class LeaderLatchDemo extends BaseConnectionInfo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
TestingServer server=new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}
});
examples.add(latch);
client.start();
latch.start();
}
Thread.sleep(10000);
LeaderLatch currentLeader = null;
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
currentLeader.close();
Thread.sleep(5000);
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
} finally {
for (LeaderLatch latch : examples) {
if (null != latch.getState())
CloseableUtils.closeQuietly(latch);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
可以添加test module的依賴方便進(jìn)行測試络断,不需要啟動真實的zookeeper服務(wù)端:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
</dependency>
首先我們創(chuàng)建了10個LeaderLatch裁替,啟動后它們中的一個會被選舉為leader。 因為選舉會花費(fèi)一些時間貌笨,start后并不能馬上就得到leader弱判。
通過hasLeadership
查看自己是否是leader, 如果是的話返回true锥惋。
可以通過.getLeader().getId()
可以得到當(dāng)前的leader的ID昌腰。
只能通過close
釋放當(dāng)前的領(lǐng)導(dǎo)權(quán)开伏。
await
是一個阻塞方法, 嘗試獲取leader地位遭商,但是未必能上位固灵。
LeaderSelector
LeaderSelector使用的時候主要涉及下面幾個類:
- LeaderSelector
- LeaderSelectorListener
- LeaderSelectorListenerAdapter
- CancelLeadershipException
核心類是LeaderSelector,它的構(gòu)造函數(shù)如下:
public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
類似LeaderLatch,LeaderSelector必須start
: leaderSelector.start();
一旦啟動劫流,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()
方法被調(diào)用巫玻。而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。 當(dāng)你不再使用LeaderSelector實例時祠汇,應(yīng)該調(diào)用它的close方法仍秤。
異常處理 LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須小心連接狀態(tài)的改變可很。如果實例成為leader, 它應(yīng)該響應(yīng)SUSPENDED 或 LOST诗力。 當(dāng) SUSPENDED 狀態(tài)出現(xiàn)時, 實例必須假定在重新連接成功之前它可能不再是leader了我抠。 如果LOST狀態(tài)出現(xiàn)苇本, 實例不再是leader, takeLeadership方法返回屿良。
重要: 推薦處理方式是當(dāng)收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常.圈澈。這會導(dǎo)致LeaderSelector實例中斷并取消執(zhí)行takeLeadership方法的異常.。這非常重要尘惧, 你必須考慮擴(kuò)展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯康栈。
下面的一個例子摘抄自官方:
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}
你可以在takeLeadership進(jìn)行任務(wù)的分配等等,并且不要返回喷橙,如果你想要要此實例一直是leader的話可以加一個死循環(huán)啥么。調(diào)用 leaderSelector.autoRequeue();
保證在此實例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)。 在這里我們使用AtomicInteger來記錄此client獲得領(lǐng)導(dǎo)權(quán)的次數(shù)贰逾, 它是”fair”悬荣, 每個client有平等的機(jī)會獲得領(lǐng)導(dǎo)權(quán)。
public class LeaderSelectorDemo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectorAdapter> examples = Lists.newArrayList();
TestingServer server = new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
examples.add(selectorAdapter);
client.start();
selectorAdapter.start();
}
System.out.println("Press enter/return to quit\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (LeaderSelectorAdapter exampleClient : examples) {
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
CloseableUtils.closeQuietly(server);
}
}
}
對比可知疙剑,LeaderLatch必須調(diào)用close()
方法才會釋放領(lǐng)導(dǎo)權(quán)氯迂,而對于LeaderSelector,通過LeaderSelectorListener
可以對領(lǐng)導(dǎo)權(quán)進(jìn)行控制言缤, 在適當(dāng)?shù)臅r候釋放領(lǐng)導(dǎo)權(quán)嚼蚀,這樣每個節(jié)點都有可能獲得領(lǐng)導(dǎo)權(quán)。從而管挟,LeaderSelector具有更好的靈活性和可控性轿曙,建議有LeaderElection應(yīng)用場景下優(yōu)先使用LeaderSelector。