Jetcd 實現(xiàn)主從選舉(示例代碼)

@Slf4j
@Component
public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
    private final Client jetcdClient;
    private final String electionNameText = "/testElection";
    private final String firstNonLoopbackAddress;

    private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
    private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
        this.jetcdClient = jetcdClient;
        this.firstNonLoopbackAddress = "127.0.0.1:" + port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(this::run).start();
    }

    private void run() {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        log.info("[Election] 正在執(zhí)行選舉 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());

        Election electionClient = this.jetcdClient.getElectionClient();
        electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));

        LeaderResponse leader = null;
        try {
            leader = electionClient.leader(electionName).get(); // etcd中沒有l(wèi)eader會報錯
        } catch (Exception ignored) {
        }

        if (leader == null) {
            log.info("[Election] 檢測到leader不存在趁蕊,當(dāng)前實例正在嘗試參選...");
            this.doElect(electionName, proposal);
        }
    }

    private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
        Lease leaseClient = this.jetcdClient.getLeaseClient();
        Election electionClient = this.jetcdClient.getElectionClient();

        LeaseGrantResponse lease;
        try {
            lease = leaseClient.grant(15).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 選舉時發(fā)生異常鸟赫,無法獲得租約!", e);
            return false;
        }
        long leaseID = lease.getID();

        try {
            electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);

            // leader選舉后進行自動續(xù)約,請求發(fā)送周期為: lease.ttl / 3
            leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
            log.info("[Election] 選舉完成参淹,當(dāng)前實例當(dāng)選成功!");
            return true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 選舉時發(fā)生異常宁炫,無法獲得租約!", e);
            return false;
        } catch (TimeoutException e) {
            log.info("[Election] 當(dāng)前實例未能選中土砂,停止參選中.");
            return false;
        }
    }

    private void handleLeaderResponse(LeaderResponse response) {
        Watch watchClient = this.jetcdClient.getWatchClient();

        KeyValue kv = response.getKv();
        ByteSequence proposalKey = kv.getKey(); // ${electionName}/${隨機字符串}
        log.info("[Election] 選舉完成州既,當(dāng)選實例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());

        Watch.Watcher oldWatcher = this.watcher.get();
        if (oldWatcher != null) {
            oldWatcher.close();
        }

        Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
        this.watcher.compareAndSet(oldWatcher, newWatcher);
    }

    private void handleLeaderChange(WatchResponse response) {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        Election electionClient = this.jetcdClient.getElectionClient();

        List<WatchEvent> events = response.getEvents();
        WatchEvent watchEvent = null;
        if (events != null && !events.isEmpty()) {
            watchEvent = events.get(0);
        }
        WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);

        if (this.connectionExceptionFlag.get()) {
            log.info("[Election] [{}] 檢測到與etcd連接異常谜洽,重新注冊observe服務(wù).", eventType);

            electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
            this.connectionExceptionFlag.compareAndSet(true, false);
        } else {
            log.info("[Election] [{}] 檢測到leader變動事件,當(dāng)前實例正在嘗試參選...", eventType);
        }
        this.doElect(electionName, proposal); 
    }

    private void handleWatchError(Throwable throwable) {
        // 發(fā)現(xiàn)和etcd連接出現(xiàn)了異常
        this.connectionExceptionFlag.compareAndSet(false, true);
    }

    public ByteSequence getElectionName(String electionNameText) {
        return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
    }

    public ByteSequence getProposal(String firstNonLoopbackAddress) {
        return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
    }


    private static class LeaderElectionListener implements Election.Listener {
        private final Consumer<LeaderResponse> leaderResponseConsumer;
        private final ExecutorService executor;

        public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
            this.leaderResponseConsumer = leaderResponseConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(LeaderResponse response) {
            CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    e.printStackTrace();
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaderWatchListener implements Watch.Listener {
        private final Consumer<WatchResponse> leaderChangeConsumer;
        private final Consumer<Throwable> onErrorConsumer;
        private final ExecutorService executor;

        public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
            this.leaderChangeConsumer = leaderChangeConsumer;
            this.onErrorConsumer = onErrorConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(WatchResponse response) {
            CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    log.error(e.getMessage(), e);
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            RuntimeException t = new RuntimeException(throwable);
            log.error(throwable.getMessage(), t);
            this.onErrorConsumer.accept(t);
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {

        @Override
        public void onNext(LeaseKeepAliveResponse value) {
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吴叶,一起剝皮案震驚了整個濱河市阐虚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蚌卤,老刑警劉巖实束,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異逊彭,居然都是意外死亡咸灿,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門侮叮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來避矢,“玉大人,你說我怎么就攤上這事囊榜∩笮兀” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵锦聊,是天一觀的道長歹嘹。 經(jīng)常有香客問我,道長孔庭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任材蛛,我火速辦了婚禮圆到,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘卑吭。我一直安慰自己芽淡,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布豆赏。 她就那樣靜靜地躺著挣菲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掷邦。 梳的紋絲不亂的頭發(fā)上白胀,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音抚岗,去河邊找鬼或杠。 笑死,一個胖子當(dāng)著我的面吹牛宣蔚,可吹牛的內(nèi)容都是我干的向抢。 我是一名探鬼主播认境,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼挟鸠!你這毒婦竟也來了叉信?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤艘希,失蹤者是張志新(化名)和其女友劉穎茉盏,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體枢冤,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡鸠姨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了淹真。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讶迁。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖核蘸,靈堂內(nèi)的尸體忽然破棺而出巍糯,到底是詐尸還是另有隱情,我是刑警寧澤客扎,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布祟峦,位于F島的核電站,受9級特大地震影響徙鱼,放射性物質(zhì)發(fā)生泄漏宅楞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一袱吆、第九天 我趴在偏房一處隱蔽的房頂上張望厌衙。 院中可真熱鬧,春花似錦绞绒、人聲如沸婶希。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽喻杈。三九已至,卻和暖如春狰晚,著一層夾襖步出監(jiān)牢的瞬間筒饰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工家肯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留龄砰,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像换棚,于是被迫代替她去往敵國和親式镐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容