Zookeeper客戶端Curator使用詳解-選舉(二)

轉(zhuǎn):http://throwable.coding.me/2018/12/16/zookeeper-curator-usage

Leader選舉

在分布式計算中赴叹, leader elections是很重要的一個功能躲舌, 這個選舉過程是這樣子的: 指派一個進(jìn)程作為組織者宦搬,將任務(wù)分發(fā)給各節(jié)點。 在任務(wù)開始前刃鳄, 哪個節(jié)點都不知道誰是leader(領(lǐng)導(dǎo)者)或者coordinator(協(xié)調(diào)者). 當(dāng)選舉算法開始執(zhí)行后, 每個節(jié)點最終會得到一個唯一的節(jié)點作為任務(wù)leader. 除此之外, 選舉還經(jīng)常會發(fā)生在leader意外宕機(jī)的情況下罢荡,新的leader要被選舉出來。

在zookeeper集群中对扶,leader負(fù)責(zé)寫操作区赵,然后通過Zab協(xié)議實現(xiàn)follower的同步,leader或者follower都可以處理讀操作浪南。

Curator 有兩種leader選舉的recipe,分別是LeaderSelectorLeaderLatch笼才。

前者是所有存活的客戶端不間斷的輪流做Leader,大同社會络凿。后者是一旦選舉出Leader骡送,除非有客戶端掛掉重新觸發(fā)選舉,否則不會交出領(lǐng)導(dǎo)權(quán)絮记。某黨?

LeaderLatch

LeaderLatch有兩個構(gòu)造函數(shù):

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的啟動:

leaderLatch.start( );

一旦啟動摔踱,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后其中一個最終會被選舉為leader怨愤,可以通過hasLeadership方法查看LeaderLatch實例是否leader:

leaderLatch.hasLeadership( ); //返回true說明當(dāng)前實例是leader

類似JDK的CountDownLatch派敷, LeaderLatch在請求成為leadership會block(阻塞),一旦不使用LeaderLatch了撰洗,必須調(diào)用close方法篮愉。 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader差导。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

異常處理: LeaderLatch實例可以增加ConnectionStateListener來監(jiān)聽網(wǎng)絡(luò)連接問題试躏。 當(dāng) SUSPENDED 或 LOST 時, leader不再認(rèn)為自己還是leader。當(dāng)LOST后連接重連后RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新創(chuàng)建一個柿汛。LeaderLatch用戶必須考慮導(dǎo)致leadership丟失的連接問題冗酿。 強(qiáng)烈推薦你使用ConnectionStateListener。

一個LeaderLatch的使用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的依賴方便進(jìn)行測試络断,不需要啟動真實的zookeeper服務(wù)端:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
</dependency>

首先我們創(chuàng)建了10個LeaderLatch裁替,啟動后它們中的一個會被選舉為leader。 因為選舉會花費(fèi)一些時間貌笨,start后并不能馬上就得到leader弱判。
通過hasLeadership查看自己是否是leader, 如果是的話返回true锥惋。
可以通過.getLeader().getId()可以得到當(dāng)前的leader的ID昌腰。
只能通過close釋放當(dāng)前的領(lǐng)導(dǎo)權(quán)开伏。
await是一個阻塞方法, 嘗試獲取leader地位遭商,但是未必能上位固灵。

LeaderSelector

LeaderSelector使用的時候主要涉及下面幾個類:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心類是LeaderSelector,它的構(gòu)造函數(shù)如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

類似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啟動劫流,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()方法被調(diào)用巫玻。而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。 當(dāng)你不再使用LeaderSelector實例時祠汇,應(yīng)該調(diào)用它的close方法仍秤。

異常處理 LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須小心連接狀態(tài)的改變可很。如果實例成為leader, 它應(yīng)該響應(yīng)SUSPENDED 或 LOST诗力。 當(dāng) SUSPENDED 狀態(tài)出現(xiàn)時, 實例必須假定在重新連接成功之前它可能不再是leader了我抠。 如果LOST狀態(tài)出現(xiàn)苇本, 實例不再是leader, takeLeadership方法返回屿良。

重要: 推薦處理方式是當(dāng)收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常.圈澈。這會導(dǎo)致LeaderSelector實例中斷并取消執(zhí)行takeLeadership方法的異常.。這非常重要尘惧, 你必須考慮擴(kuò)展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯康栈。

下面的一個例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership進(jìn)行任務(wù)的分配等等,并且不要返回喷橙,如果你想要要此實例一直是leader的話可以加一個死循環(huán)啥么。調(diào)用 leaderSelector.autoRequeue();保證在此實例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)。 在這里我們使用AtomicInteger來記錄此client獲得領(lǐng)導(dǎo)權(quán)的次數(shù)贰逾, 它是”fair”悬荣, 每個client有平等的機(jī)會獲得領(lǐng)導(dǎo)權(quán)。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

對比可知疙剑,LeaderLatch必須調(diào)用close()方法才會釋放領(lǐng)導(dǎo)權(quán)氯迂,而對于LeaderSelector,通過LeaderSelectorListener可以對領(lǐng)導(dǎo)權(quán)進(jìn)行控制言缤, 在適當(dāng)?shù)臅r候釋放領(lǐng)導(dǎo)權(quán)嚼蚀,這樣每個節(jié)點都有可能獲得領(lǐng)導(dǎo)權(quán)。從而管挟,LeaderSelector具有更好的靈活性和可控性轿曙,建議有LeaderElection應(yīng)用場景下優(yōu)先使用LeaderSelector。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市导帝,隨后出現(xiàn)的幾起案子守谓,更是在濱河造成了極大的恐慌,老刑警劉巖您单,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件斋荞,死亡現(xiàn)場離奇詭異,居然都是意外死亡睹限,警方通過查閱死者的電腦和手機(jī)譬猫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來羡疗,“玉大人,你說我怎么就攤上這事别洪∵逗蓿” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵挖垛,是天一觀的道長痒钝。 經(jīng)常有香客問我,道長痢毒,這世上最難降的妖魔是什么送矩? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮哪替,結(jié)果婚禮上栋荸,老公的妹妹穿的比我還像新娘。我一直安慰自己凭舶,他們只是感情好晌块,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著帅霜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上役拴,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天辐赞,我揣著相機(jī)與錄音,去河邊找鬼搂根。 笑死珍促,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的兄墅。 我是一名探鬼主播踢星,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了沐悦?” 一聲冷哼從身側(cè)響起成洗,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎藏否,沒想到半個月后瓶殃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡副签,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年遥椿,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片淆储。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡冠场,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出本砰,到底是詐尸還是另有隱情碴裙,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布点额,位于F島的核電站舔株,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏还棱。R本人自食惡果不足惜载慈,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望珍手。 院中可真熱鬧办铡,春花似錦、人聲如沸珠十。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焙蹭。三九已至晒杈,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間孔厉,已是汗流浹背拯钻。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留撰豺,地道東北人粪般。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像污桦,于是被迫代替她去往敵國和親亩歹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容