????Zookeeper是采用的zab協(xié)議進(jìn)行實現(xiàn)的,而不是完全Paxos實現(xiàn)的箫措。在主備系統(tǒng)架構(gòu)模式下腹备,采用zab協(xié)議來保證集群中所有副本的數(shù)據(jù)一致性。主系統(tǒng)接受處理所有的事務(wù)性請求斤蔓,然后將數(shù)據(jù)變更狀態(tài)以proposal提案的形式同步給所有的副本進(jìn)程植酥。所以在這個過程中,Leader機(jī)器顯得格外重要弦牡。
????Leader選舉就是在集群中選舉出一個主進(jìn)程友驮,用來接收處理所有客戶端的事務(wù)性請求。有個隱式條件就是集群中的服務(wù)器大于等于2臺才能開始Leader選舉驾锰。
Leader選舉時機(jī):
- 服務(wù)器啟動時期的Leader選舉
- 服務(wù)器運行期間的Leader選舉
選舉流程:
- 發(fā)送當(dāng)前自己機(jī)器的選票信息給集群中的其它機(jī)器
- 接收集群中其它機(jī)器發(fā)送過來的選票信息
- 處理接收到的投票信息
- 統(tǒng)計投票信息
- 改變當(dāng)前服務(wù)器的狀態(tài)
選票PK規(guī)則:
- 首先比對zxId
- 再比對sid
Zookeeper中l(wèi)eader選舉的實現(xiàn)
????zookeeper中的leader選舉由FastLeaderElection具體實現(xiàn)卸留。其中有幾個重要的類:
- Notification:代表收到的投票信息類
- ToSend:發(fā)送給其它服務(wù)器的投票信息
- WorkerReceiver和WorkerSender以及Messager
protected class Messenger {
// 選票發(fā)送器
WorkerSender ws;
// 選票接收器
WorkerReceiver wr;
}
- recvqueue:收票隊列
- sendqueue:發(fā)送選票隊列
????WorkerReceiver和WorkerSender不停地從QuorumCnxManager中獲取收到的選票信息,以及向集群中所有其它looking機(jī)器發(fā)送選票信息椭豫。
FastLeaderElection繼承自Election艾猜,實現(xiàn)了其中的選舉leader的方法
public Vote lookForLeader() throws InterruptedException {
try {
//TODO 所有收到的選票集合
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
synchronized (this) {
//TODO 邏輯時鐘++
logicalclock.incrementAndGet();
//TODO 更新選票 推選的leaderId、zxId 和 選舉周期
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//TODO 給集群中的其它服務(wù)器發(fā)送當(dāng)前服務(wù)器的投票信息
sendNotifications();
SyncedLearnerTracker voteSet;
//當(dāng)前服務(wù)器是選舉狀態(tài)
while ((self.getPeerState() == QuorumPeer.ServerState.LOOKING) && (!stop)) {
//TODO 從QuorumCnxManager中獲取收到的外部 投票信息
FastLeaderElection.Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
//TODO 收到的外部投票信息為空
if (n == null) {
//TODO 當(dāng)前服務(wù)器選票信息是否發(fā)送完
if (manager.haveDelivered()) {
//TODO 發(fā)送完了繼續(xù)發(fā)送當(dāng)前服務(wù)器的選票信息給集群中的其它服務(wù)器
sendNotifications();
} else {
//TODO 沒有發(fā)送完就當(dāng)前服務(wù)器建立和其它服務(wù)器的鏈接信息
manager.connectAll();
}
} else if (validVoter(n.sid) && validVoter(n.leader)) {
//TODO 收到的選票中捻悯,投票者和被推選者都是 屬于投票集合中
//TODO 查看收到的選票的狀態(tài)
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
//TODO 如果收到的投票選舉周期大于當(dāng)前機(jī)器的時鐘周期
if (n.electionEpoch > logicalclock.get()) {
//TODO 更新當(dāng)前機(jī)器的時鐘周期
logicalclock.set(n.electionEpoch);
//TODO 清空所有收到的投票信息
recvset.clear();
//TODO 如果收到的選票信息優(yōu)于當(dāng)前服務(wù)器選票信息匆赃,變更當(dāng)前服務(wù)器的投票信息
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//TODO 更新選票為自己當(dāng)前服務(wù)器的選票信息
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//TODO 發(fā)送選票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//TODO 如果邏輯時鐘小于當(dāng)前邏輯時鐘,忽略
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
//TODO 選舉周期相同今缚,但是收到的選票更優(yōu)算柳,更新選票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
//TODO 發(fā)送選票信息
sendNotifications();
}
//TODO 收到的選票信息放入集合中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//TODO 投票歸檔,查看是否已leader選舉完成
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}