@Slf4j
@Component
public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
private final Client jetcdClient;
private final String electionNameText = "/testElection";
private final String firstNonLoopbackAddress;
private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
this.jetcdClient = jetcdClient;
this.firstNonLoopbackAddress = "127.0.0.1:" + port;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
new Thread(this::run).start();
}
private void run() {
ByteSequence electionName = this.getElectionName(this.electionNameText);
ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
log.info("[Election] 正在執(zhí)行選舉 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());
Election electionClient = this.jetcdClient.getElectionClient();
electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
LeaderResponse leader = null;
try {
leader = electionClient.leader(electionName).get(); // etcd中沒有l(wèi)eader會報錯
} catch (Exception ignored) {
}
if (leader == null) {
log.info("[Election] 檢測到leader不存在趁蕊,當(dāng)前實例正在嘗試參選...");
this.doElect(electionName, proposal);
}
}
private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
Lease leaseClient = this.jetcdClient.getLeaseClient();
Election electionClient = this.jetcdClient.getElectionClient();
LeaseGrantResponse lease;
try {
lease = leaseClient.grant(15).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[Election] 選舉時發(fā)生異常鸟赫,無法獲得租約!", e);
return false;
}
long leaseID = lease.getID();
try {
electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);
// leader選舉后進行自動續(xù)約,請求發(fā)送周期為: lease.ttl / 3
leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
log.info("[Election] 選舉完成参淹,當(dāng)前實例當(dāng)選成功!");
return true;
} catch (InterruptedException | ExecutionException e) {
log.error("[Election] 選舉時發(fā)生異常宁炫,無法獲得租約!", e);
return false;
} catch (TimeoutException e) {
log.info("[Election] 當(dāng)前實例未能選中土砂,停止參選中.");
return false;
}
}
private void handleLeaderResponse(LeaderResponse response) {
Watch watchClient = this.jetcdClient.getWatchClient();
KeyValue kv = response.getKv();
ByteSequence proposalKey = kv.getKey(); // ${electionName}/${隨機字符串}
log.info("[Election] 選舉完成州既,當(dāng)選實例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());
Watch.Watcher oldWatcher = this.watcher.get();
if (oldWatcher != null) {
oldWatcher.close();
}
Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
this.watcher.compareAndSet(oldWatcher, newWatcher);
}
private void handleLeaderChange(WatchResponse response) {
ByteSequence electionName = this.getElectionName(this.electionNameText);
ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
Election electionClient = this.jetcdClient.getElectionClient();
List<WatchEvent> events = response.getEvents();
WatchEvent watchEvent = null;
if (events != null && !events.isEmpty()) {
watchEvent = events.get(0);
}
WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);
if (this.connectionExceptionFlag.get()) {
log.info("[Election] [{}] 檢測到與etcd連接異常谜洽,重新注冊observe服務(wù).", eventType);
electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
this.connectionExceptionFlag.compareAndSet(true, false);
} else {
log.info("[Election] [{}] 檢測到leader變動事件,當(dāng)前實例正在嘗試參選...", eventType);
}
this.doElect(electionName, proposal);
}
private void handleWatchError(Throwable throwable) {
// 發(fā)現(xiàn)和etcd連接出現(xiàn)了異常
this.connectionExceptionFlag.compareAndSet(false, true);
}
public ByteSequence getElectionName(String electionNameText) {
return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
}
public ByteSequence getProposal(String firstNonLoopbackAddress) {
return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
}
private static class LeaderElectionListener implements Election.Listener {
private final Consumer<LeaderResponse> leaderResponseConsumer;
private final ExecutorService executor;
public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
this.leaderResponseConsumer = leaderResponseConsumer;
this.executor = executor;
}
@Override
public void onNext(LeaderResponse response) {
CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
if (e != null) {
e.printStackTrace();
}
return null;
});
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), new RuntimeException(throwable));
}
@Override
public void onCompleted() {
}
}
private static class LeaderWatchListener implements Watch.Listener {
private final Consumer<WatchResponse> leaderChangeConsumer;
private final Consumer<Throwable> onErrorConsumer;
private final ExecutorService executor;
public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
this.leaderChangeConsumer = leaderChangeConsumer;
this.onErrorConsumer = onErrorConsumer;
this.executor = executor;
}
@Override
public void onNext(WatchResponse response) {
CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
if (e != null) {
log.error(e.getMessage(), e);
}
return null;
});
}
@Override
public void onError(Throwable throwable) {
RuntimeException t = new RuntimeException(throwable);
log.error(throwable.getMessage(), t);
this.onErrorConsumer.accept(t);
}
@Override
public void onCompleted() {
}
}
private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {
@Override
public void onNext(LeaseKeepAliveResponse value) {
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), new RuntimeException(throwable));
}
@Override
public void onCompleted() {
}
}
}
Jetcd 實現(xiàn)主從選舉(示例代碼)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門侮叮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來避矢,“玉大人,你說我怎么就攤上這事囊榜∩笮兀” “怎么了?”我有些...
- 文/不壞的土叔 我叫張陵锦聊,是天一觀的道長歹嘹。 經(jīng)常有香客問我,道長孔庭,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任材蛛,我火速辦了婚禮圆到,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘卑吭。我一直安慰自己芽淡,他們只是感情好,可當(dāng)我...
- 文/花漫 我一把揭開白布豆赏。 她就那樣靜靜地躺著挣菲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掷邦。 梳的紋絲不亂的頭發(fā)上白胀,一...
- 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼挟鸠!你這毒婦竟也來了叉信?” 一聲冷哼從身側(cè)響起,我...
- 正文 年R本政府宣布祟峦,位于F島的核電站,受9級特大地震影響徙鱼,放射性物質(zhì)發(fā)生泄漏宅楞。R本人自食惡果不足惜,卻給世界環(huán)境...
- 文/蒙蒙 一袱吆、第九天 我趴在偏房一處隱蔽的房頂上張望厌衙。 院中可真熱鬧,春花似錦绞绒、人聲如沸婶希。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽喻杈。三九已至,卻和暖如春狰晚,著一層夾襖步出監(jiān)牢的瞬間筒饰,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- Bully 算法實現(xiàn) 設(shè)定集群中有三個節(jié)點固蚤,通過Bully算法實現(xiàn)選主娘汞。節(jié)點之間的通信使用的是自我實現(xiàn)的Remot...
- 一你弦、db設(shè)置 主寫,從讀燎孟,可部署多個提高“讀”速度禽作。寫后從庫也更新 1)master記錄數(shù)據(jù)庫操作日志到Bina...
- pom.xml application.properties: 相關(guān)java代碼
- 前言 ??該文是基于上篇《MySQL主從分離的實現(xiàn)》的代碼層實現(xiàn),所以本文配置的主數(shù)據(jù)庫和從數(shù)據(jù)庫的數(shù)據(jù)源都是在上...
- 記錄方法 css代碼 html代碼 純CSS3書本打開動畫加載特效[https://sc.chinaz.com/j...