選舉信息-選舉流程-選舉場(chǎng)景-源碼分析
1 選舉信息
<1> 服務(wù)器角色信息
在Zookeeper集群提供服務(wù)時(shí),集群中角色如下:
- Leader:一個(gè)Zookeeper集群同一時(shí)間只有一個(gè)Leader。所有的寫操作必須要通過Leader完成废酷,再由Leader將寫操作廣播給其它服務(wù)器栅屏。
- Follower:一個(gè)Zookeeper集群可以存在多個(gè)Follower逛艰。Follower可直接處理并且返回客戶端的讀請(qǐng)求蘑志,同時(shí)會(huì)將寫請(qǐng)求轉(zhuǎn)發(fā)給Leader處理金砍,并且負(fù)責(zé)在Leader處理寫請(qǐng)求時(shí)對(duì)請(qǐng)求進(jìn)行投票址否。另外Follower可以參與競(jìng)選Leader餐蔬。
- Observer:Observer功能與Follower類似碎紊,但是沒有投票權(quán),也不會(huì)參與競(jìng)選Leader樊诺。
<2> 服務(wù)器狀態(tài)信息
- Looking:尋找Leader狀態(tài)仗考。當(dāng)服務(wù)器處于該狀態(tài)時(shí),會(huì)認(rèn)為當(dāng)前集群中沒有Leader词爬,因此需要進(jìn)入Leader選舉流程
- Following:跟隨者狀態(tài)秃嗜,表明當(dāng)前服務(wù)器角色是Follower
- Leading:領(lǐng)導(dǎo)者狀態(tài),表明當(dāng)前服務(wù)器角色是Leader
- Observing:觀察者狀態(tài)顿膨,表明當(dāng)前服務(wù)器角色是Observer
<3> 投票信息
- leader:被選舉的Leader的sid
- zxid:被選舉的Leader的事務(wù)id
- sid:當(dāng)前服務(wù)器的sid
- electionEpoch:當(dāng)前投票的輪次
- peerEpoch:當(dāng)前服務(wù)器的Epoch
選票PK:
(1)選票中Epoch大的優(yōu)先級(jí)高锅锨;
(2)選票中Zxid的大的優(yōu)先級(jí)高;
(3)選票中Sid大的優(yōu)先級(jí)高恋沃;
選票終止條件:
以某一選票數(shù)占集群中參與競(jìng)選節(jié)點(diǎn)(除Observer外)數(shù)量的一半以上必搞,選舉結(jié)束;
2 選舉流程
3 選舉場(chǎng)景
<1> 票箱信息
票箱信息:保存選舉的服務(wù)器SID和被選舉的服務(wù)器SID囊咏,即(sid恕洲,leader);例如集群中節(jié)點(diǎn)為SID=1的服務(wù)器選舉節(jié)點(diǎn)為SID=3的服務(wù)器梅割,則票箱信息為(1,3)
<2> 選票信息
選票信息為(electionEpoch霜第,leader,zxid)炮捧,分別代表選舉的輪次庶诡、被選舉服務(wù)器的SID,被選舉服務(wù)器的zxid
<3> 初始啟動(dòng)選舉
(1)每個(gè)Server發(fā)出一個(gè)投票咆课,初始情況末誓,Server都會(huì)將自己作為L(zhǎng)eader服務(wù)器來進(jìn)行投票,比如Server1會(huì)發(fā)出(1,1,0)選票(表示epoch為1书蚪,選舉leader的sid為1喇澡,并且被選舉的服務(wù)器zxid為0),然后各自將這個(gè)投票發(fā)給集群中其他機(jī)器殊校,Server1的票箱信息為(1,1)(表示投票的服務(wù)器sid為1晴玖,選舉的leader的sid為1)
(2)接受來自各個(gè)服務(wù)器的投票。集群的每個(gè)服務(wù)器收到投票后为流,首先判斷該投票的有效性呕屎,如檢查是否是本輪投票,是否來自Looking狀態(tài)的服務(wù)器敬察。
(3)處理投票秀睛。針對(duì)每一個(gè)投票,服務(wù)器需要將別人的票和自己的票進(jìn)行PK莲祸,pk規(guī)則如上所示蹂安。
(4)統(tǒng)計(jì)投票椭迎。每次投票后,服務(wù)器都會(huì)統(tǒng)計(jì)投票信息田盈,判斷是否已經(jīng)有過半機(jī)器接受到相同的投票信息畜号。
(5)改變服務(wù)器狀態(tài)。一旦確定了Leader允瞧,每個(gè)服務(wù)器就會(huì)更新自己的狀態(tài)简软,如果是Follower,那么就變更為FOLLOWING瓷式,如果是Leader替饿,就變更為L(zhǎng)EADING。
<4> Follower重啟
<5> 運(yùn)行期間選舉 leader宕機(jī)
與上面相比贸典,會(huì)在開始添加一個(gè)步驟【變更狀態(tài)】视卢。
Leader掛后,余下的非Observer服務(wù)器都會(huì)講自己的服務(wù)器狀態(tài)變更為L(zhǎng)OOKING廊驼,然后開始進(jìn)入Leader選舉過程据过。
4 源碼分析
4.1 類圖關(guān)系
4.1 FastLeaderElection
<1> Notification
Notification表示收到的選舉投票信息(其他服務(wù)器發(fā)來的選舉投票信息),其包含了被選舉者的id妒挎、zxid绳锅、選舉周期等信息,其buildMsg方法將選舉信息封裝至ByteBuffer中再進(jìn)行發(fā)送酝掩。
<2> ToSend
ToSend表示發(fā)送給其他服務(wù)器的選舉投票信息鳞芙,也包含了被選舉者的id、zxid期虾、選舉周期等信息原朝。
<3> Messenger
其中 WorkerReceiver:
選票接收器,不斷地從QuorumCnxManager中獲取其他服務(wù)器發(fā)來的選舉消息镶苞,并將其轉(zhuǎn)換成一個(gè)選票喳坠,然后保存到recvqueue中
其中 WorkerSender:
選票發(fā)送器,其會(huì)不斷地從sendqueue中獲取待發(fā)送的選票茂蚓,并將其傳遞到底層QuorumCnxManager中壕鹉,其過程是將FastLeaderElection的ToSend轉(zhuǎn)化為QuorumCnxManager的Message
4.2 QuorumCnxManager
該類有四個(gè)內(nèi)部類:
SendWorker類,Message類聋涨,RecWorker類晾浴,Listener類
<1> SendWorder
這個(gè)類作為“發(fā)送者”,繼承ZooKeeperThread牍白,線程不斷地從發(fā)送隊(duì)列取出脊凰,發(fā)送給對(duì)應(yīng)sid的機(jī)器。
Long sid; //目標(biāo)機(jī)器sid淹朋,不是當(dāng)前機(jī)器sid
Socket sock;
RecvWorker recvWorker; //該sid對(duì)應(yīng)的RecWorker
volatile boolean running = true;
DataOutputStream dout;
<2> Message
static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
this.sid = sid;
}
ByteBuffer buffer;
long sid;
}
sid為消息來源方的sid笙各,buffer即指消息體
<3> RecvWorker
Long sid;
Socket sock;
volatile boolean running = true;
final DataInputStream din;
final SendWorker sw;
4.3 FastLeaderElection中的lookForLeader()
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
// 這是票箱的意思嗎
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
// 邏輯時(shí)鐘自增,每進(jìn)行一輪新的Leader選舉,都需要更新邏輯時(shí)鐘
logicalclock.incrementAndGet();
// 更新選票(初始化選票)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 廣播選票
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
// 循環(huán)中(當(dāng)前服務(wù)器處于LOOKING)
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// 從接受到的選票隊(duì)列中拿取一個(gè) Notification
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
// 接受到的選票隊(duì)列中無選票
if(n == null){
// 如果發(fā)往各個(gè)服務(wù)器的消息隊(duì)列都為空
if(manager.haveDelivered()){
// 廣播選票
sendNotifications();
} else {
// 存在未發(fā)送的消息,遍歷與服務(wù)器進(jìn)行連接
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
// 選票接收隊(duì)列中存在選票并且該選票的發(fā)送者是有資格投票的
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
// 判斷發(fā)送選票的服務(wù)器的狀態(tài)
switch (n.state) {
case LOOKING: // 處于尋找leader狀態(tài)
// If notification > current, replace and send messages out
// 發(fā)出選票的服務(wù)器epoch大于本服務(wù)器的邏輯時(shí)鐘
if (n.electionEpoch > logicalclock.get()) {
// 改變邏輯時(shí)鐘的值
logicalclock.set(n.electionEpoch);
// TODO 清空票箱
recvset.clear();
// 進(jìn)行選票pk
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 將獲勝的選票進(jìn)行廣播
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) { // 發(fā)出選票的服務(wù)器epoch小于本服務(wù)器的邏輯時(shí)鐘
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break; // 直接跳出循環(huán)了?础芍?杈抢? 這里是結(jié)束switch循環(huán),重新獲取一張選票
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, // 發(fā)出選票的服務(wù)器epoch等于本服務(wù)器的邏輯時(shí)鐘,進(jìn)行pk,如果獲勝則更新選票并廣播
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 將選票放入票箱
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 判斷選舉是否結(jié)束
// 票箱和當(dāng)前的leader選票進(jìn)行比較,看是否超過半數(shù)
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
// 將選票接收隊(duì)列中所有剩下的選票與選出的leader比較,如果獲勝仑性,則放入票箱惶楼,跳出while循環(huán)
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
// 選票接收隊(duì)列中已經(jīng)沒有選票了
if (n == null) {
// 最后勝出的選票是自己,更新狀態(tài)為leading,否則為following
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
// 發(fā)送選票的服務(wù)器的狀態(tài)是Following和Leading
// 這種情況是某臺(tái)服務(wù)器重啟之后,已經(jīng)選舉出新Leader了
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
// 如果是同一輪投票
if(n.electionEpoch == logicalclock.get()){ //是否可以加入已有的集群
// 將選票放入到票箱中
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
// 這是一條與當(dāng)前邏輯時(shí)鐘不符合的消息诊杆,那么說明在另一個(gè)選舉過程中已經(jīng)有了選舉結(jié)果歼捐,
// 于是將該選舉結(jié)果加入到outofelection集合中,再根據(jù)outofelection來判斷是否可以結(jié)束選舉,
// 如果可以也是保存邏輯時(shí)鐘晨汹,設(shè)置選舉狀態(tài)豹储,退出選舉過程。
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
}
流程圖: