前言
對(duì)于一個(gè)包含多個(gè)節(jié)點(diǎn)的zookeeper集群裁眯,需要選出一個(gè)節(jié)點(diǎn)作為Leader節(jié)點(diǎn)來提供后續(xù)的服務(wù)遗嗽。那么zookeeper選主的協(xié)議是怎么樣的呢忍燥,我們下面一探究竟
選主協(xié)議
zookeeper會(huì)把集群中的節(jié)點(diǎn)分成2種類型:
- participant 參加選舉
- observer 不能參加選舉
對(duì)于partipant類型的節(jié)點(diǎn)會(huì)參加主節(jié)點(diǎn)的選舉嘴拢,選舉的過程如下
- 每個(gè)節(jié)點(diǎn)啟動(dòng)之后生成自己的vote仗处,這個(gè)vote包含主要三個(gè)方面的信息
id:推舉的主節(jié)點(diǎn)的id,默認(rèn)為自己
zxid:本機(jī)器的處理的最新的事物id
electionEpoch:每輪選舉的標(biāo)識(shí)
- 每個(gè)節(jié)點(diǎn)把當(dāng)前的vote發(fā)送給別的參與選主的節(jié)點(diǎn)
- 每個(gè)節(jié)點(diǎn)接受來自于別的服務(wù)器發(fā)送來的投票信息r_vote吨拍,根據(jù)以下規(guī)則來判斷是不是需要更新自己的vote
1. 比較vote.zxid和r_vote.zxid的大小關(guān)系,如果vote.zxid > r_vote.zxid,那么更新當(dāng)前vot.id為r_vote.id网杆,表示本節(jié)點(diǎn)推舉vote推薦的節(jié)點(diǎn)作為主節(jié)點(diǎn)羹饰,如果vote.zxid < r_vote.zxid,不更新本vote碳却,如果vote.zxid == r_vote.zxid那么執(zhí)行
下面2的邏輯
2. 比較vote.id 和 r_vote的id队秩,如果vote.id > r_vote.id不更新 ,如果vote.id < r_vote.id那么更新本vote
- 更新投票信息
- 查看是不是有節(jié)點(diǎn)得到超過半數(shù)的投票昼浦,如果有那么選舉出主節(jié)點(diǎn)
- 如果沒有節(jié)點(diǎn)得到超過半數(shù)的投票馍资,那么重復(fù)執(zhí)行步驟2
tips
這里提一下,每個(gè)節(jié)點(diǎn)在啟動(dòng)選舉的時(shí)候都會(huì)有一個(gè)electionEpoch屬性关噪,在同一輪選舉中各個(gè)節(jié)點(diǎn)的electionEpoch應(yīng)該是相同的鸟蟹,如果有一個(gè)節(jié)點(diǎn)的electionEpoch小于別的其他節(jié)點(diǎn),那么說明這個(gè)節(jié)點(diǎn)已經(jīng)落后于其他節(jié)點(diǎn)了使兔,這個(gè)時(shí)候需要清空它得到的投票信息建钥,重新更新electionEpoch加入新一輪的選主過程
選主涉及的各個(gè)線程
- WorkerSender
接受別的服務(wù)器發(fā)來的投票信息(這里不涉及網(wǎng)絡(luò)操作,只是把投票信息發(fā)送到待發(fā)隊(duì)列中)
- WorkerReceiver
發(fā)送本機(jī)的投票信息給別的服務(wù)器(這里不涉及網(wǎng)絡(luò)操作虐沥,只是從接受投票的隊(duì)列中接受別的服務(wù)器發(fā)送來的投票信息)
每個(gè)參與投票的節(jié)點(diǎn)到其他所有的投票節(jié)點(diǎn)都會(huì)接連網(wǎng)絡(luò)鏈接
- SendWorker
每個(gè)連接上都會(huì)有一個(gè)SendWorker用來通過網(wǎng)絡(luò)把投票信息發(fā)送給對(duì)應(yīng)的節(jié)點(diǎn)
- ReceiveWorker
每個(gè)連接上都會(huì)有一個(gè)ReceiveWorker用來通過網(wǎng)絡(luò)接受來自其他節(jié)點(diǎn)發(fā)送過來的投票信息
- ListenerHandler
每個(gè)節(jié)點(diǎn)接受其他節(jié)點(diǎn)連接請(qǐng)求的處理線程
- QuorumPeer
根據(jù)獲得到的其他節(jié)點(diǎn)的投票信息來動(dòng)態(tài)的改變vote和檢查是不是有主節(jié)點(diǎn)被選舉出來熊经,如果有主節(jié)點(diǎn)被選舉出來,那么退出選舉過程進(jìn)入數(shù)據(jù)恢復(fù)過程欲险,如果沒有主節(jié)點(diǎn)被選舉出來镐依,那么繼續(xù)選舉過程
下面是上述幾個(gè)線程工作交互的流程圖
有了上述這些鋪墊,那我們開始zookeeper集群選主源碼分析吧
節(jié)點(diǎn)啟動(dòng)入口
QuorumPeerMain是每個(gè)服務(wù)節(jié)點(diǎn)的啟動(dòng)入口類
initializeAndRun
是啟動(dòng)入口方法天试,在這個(gè)方法中主要做了如下三件事
- 把zoo.cfg解析成QuorumPeerConfig的屬性
- 啟動(dòng)DatadirCleanupManager來定期的清理過期snapshop文件
- 啟動(dòng)節(jié)點(diǎn) runFromConfig
runFromConfig
這個(gè)方法很長槐壳,我把一些主要的點(diǎn),做一些注釋說明
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
//上面省略一大波秋秤,但是不影響理解
if (config.getClientPortAddress() != null) {
//獲取服務(wù)端的IO服務(wù)工廠類宏粤,默認(rèn)是NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
//設(shè)置ServerCnxnFactory類的一些屬性:端口脚翘,最大可以接受的客戶端連接數(shù),創(chuàng)建SelectorThread绍哎,ExpiredThread類等
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
//QuorumPeer是服務(wù)節(jié)點(diǎn)的代表類来农,接下來發(fā)送的事情都和他有關(guān)
quorumPeer = getQuorumPeer();
//設(shè)置data和log的訪問類
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
//設(shè)置主節(jié)點(diǎn)選舉算法,目前只有一種:FastLeaderElection
quorumPeer.setElectionType(config.getElectionAlg());
//設(shè)置本節(jié)點(diǎn)的sid
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
//設(shè)置zookeeper的DataBase崇堰,注意這個(gè)時(shí)候沃于,還沒有做數(shù)據(jù)的恢復(fù)
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
//設(shè)置節(jié)點(diǎn)的類型:participant或者observer
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// 省去一大波代碼
//初始化quorumPeer,這里主要是創(chuàng)建認(rèn)證服務(wù)的工具類
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
//啟動(dòng)quoumPeer
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
}
QuorumPeer.start()
QuorumPeer啟動(dòng)的地方
public synchronized void start() {
//檢查本節(jié)點(diǎn)是不是被包含在配置文件配置的服務(wù)器列表中
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//做節(jié)點(diǎn)數(shù)據(jù)的恢復(fù)海诲,請(qǐng)參考 http://www.reibang.com/p/f10ffc0ff861
loadDataBase();
//啟動(dòng)SelectorThread繁莹,AcceptThread,來準(zhǔn)備接受客戶的請(qǐng)求特幔,請(qǐng)參考http://www.reibang.com/p/8153a113fdf7
startServerCnxnFactory();
// try {
// adminServer.start();
// } catch (AdminServerException e) {
// LOG.warn("Problem starting AdminServer", e);
// System.out.println(e);
// }
//啟動(dòng)集群選主過程
startLeaderElection();
startJvmPauseMonitor();
//本身QuorumPeer也是一個(gè)線程咨演,現(xiàn)在啟動(dòng)QuorumPeer
super.start();
}
startLeaderElection
在startLeaderElection方法中會(huì)創(chuàng)建Leader選舉過程中需要的一些線程
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
//設(shè)置當(dāng)前vote的信息,主要是三個(gè)信息:推舉的主節(jié)點(diǎn)id蚯斯,本節(jié)點(diǎn)最新的事物id zxid薄风,當(dāng)前選舉的輪數(shù)。
//每個(gè)節(jié)點(diǎn)在啟動(dòng)的時(shí)候都推舉自己作為Leader拍嵌,emm遭赂。。臉皮挺厚
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//創(chuàng)建選舉算法
this.electionAlg = createElectionAlgorithm(electionType);
}
createElectionAlgorithm
直接看源碼
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
//目前zookeeper只是支持一種選舉算法
case 3:
//QuorumCnxManager 是QuorumPeer管理與其他節(jié)點(diǎn)socket連接的類
QuorumCnxManager qcm = createCnxnManager();
//通過qcmRef檢查是不是有已經(jīng)存在的老的QuorumCnxManager存在横辆,如果有那么就關(guān)閉他
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
//Listenser是ListenerHandler的管理類
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//啟動(dòng)listener來啟動(dòng)各個(gè)ListenserHandler
listener.start();
//創(chuàng)建FastLeaderElection
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//通過start來啟動(dòng)WorkerSender撇他,WorkerReceiver
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
tips
- QuorumCnxManager中為什么需要用Listenser來管理ListenserHandler?
因?yàn)榉?wù)節(jié)點(diǎn)可能具有多個(gè)網(wǎng)卡狈蚤,這個(gè)節(jié)點(diǎn)可能會(huì)在不同的網(wǎng)卡對(duì)應(yīng)的ip地址去啟動(dòng)監(jiān)聽端口困肩,在這種情況下一個(gè)QuorumCnxManager可能會(huì)包含多個(gè)ListenserHandler,所以使用一個(gè)Listenser去管理這些ListenserHandler炫惩。
FastLeaderElection
創(chuàng)建FastLeaderElection的時(shí)候發(fā)生了什么
- 會(huì)創(chuàng)建QuorumPeer收發(fā)信息的隊(duì)列sendqueue僻弹,recvqueue
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//創(chuàng)建 sendqueue和recvqueue對(duì)象
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
//創(chuàng)建Messenger來管理WorkerSender和WorkerReceiver
this.messenger = new Messenger(manager);
}
2.創(chuàng)建Messenger類,在Manager類中會(huì)創(chuàng)建WorkerSender他嚷,WorkerReceiver來處理sendqueue和recvqueue中的數(shù)據(jù)
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
QuorumPeer.start
我們看下QuorumPeer的線程做了哪些邏輯處理
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
//處理選主的邏輯
case LOOKING:
LOG.info("LOOKING");
//省略.....
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//QuromPeer進(jìn)入選主邏輯
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
break;
//處理作為observer的邏輯
case OBSERVING:
// 省略............
break;
//處理作為follower的邏輯
case FOLLOWING:
// 省略............
break;
//處理作為Leader的邏輯
case LEADING:
// 省略............
break;
}
}
} finally {
// 忽略這部分代碼
}
}
FastLeaderElection.lookForLeader
選主的過程在lookForLeader完成蹋绽,這個(gè)方法的代碼很長,大概有200行,我回把一些不重要的代碼刪除筋蓖,
public Vote lookForLeader() throws InterruptedException {
//這個(gè)地方刪除了JMX 的一些信息
self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*/
//上面英文注釋已經(jīng)很清楚了卸耘,主要意思就是這個(gè)recvset用來接受每個(gè)服務(wù)器發(fā)送來的投票信息,
//key 是服務(wù)器的sid粘咖,vote就是這個(gè)服務(wù)器推舉的vote,通過recvset可以判斷出master節(jié)點(diǎn)有沒有被選舉出來
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
//是master節(jié)點(diǎn)用來存放 自己別選舉為Leader的vote信息
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
//logicalclock用來標(biāo)識(shí)每次選舉的輪次蚣抗,todo
logicalclock.incrementAndGet();
//更新本節(jié)點(diǎn)推舉的Leader信息(proposedLeader,proposedZxid瓮下,proposedEpoch)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
//把自己的Proposal發(fā)送給其他的服務(wù)器
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//從recvqueue中獲取別的服務(wù)器發(fā)送來的投票信息(也包括自己發(fā)送來的投票信息)
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
//如果從recvqueue中沒有得到投票信息
//如果QuorumCnxManager到別的服務(wù)節(jié)點(diǎn)已經(jīng)建立了socket連接翰铡,那么直接發(fā)送Notification
if (manager.haveDelivered()) {
sendNotifications();
} else {
//QuorumPeer通過QuorumCnxManager建立到別的服務(wù)節(jié)點(diǎn)網(wǎng)絡(luò)連接
manager.connectAll();
}
/*
* Exponential backoff
*/
//更新notTimeout
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
//如果接受到的投票信息所在的投票輪次大于logicalclock钝域,那么就更新logicalclock,同時(shí)把
//之前接受到的投票信息清空
logicalclock.set(n.electionEpoch);
recvset.clear();
//totalOrderPredicate 作用是比較獲得的vote個(gè)本節(jié)點(diǎn)vote锭魔,比較方式就是我們?cè)谖恼麻_頭描述的那樣例证,依次比較zxid,id迷捧,
//通過totalOrderPredicate來決定是不是需要更新本節(jié)點(diǎn)的vote织咧,如果需要更新,更新之后漠秋,把相關(guān)的該更新信息發(fā)送給別的服務(wù)節(jié)點(diǎn)
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//如果接受到的vote的選舉輪次electionEpoch小于本機(jī)的選舉輪次electionEpoch笙蒙,那么直接把接受到的這個(gè)vote丟棄
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
//同上面對(duì)totalOrderPredicate的分析
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
//把接受到的vote信息加入到recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//根據(jù)recvset和本節(jié)點(diǎn)的vote獲取 VoteTracker
//VoteTracker用來判斷本節(jié)點(diǎn)的vote是不是得到的過半數(shù)的其他節(jié)點(diǎn)的推舉
voteSet = getVoteTracker(recvset, new Vote(proposedLeader,proposedZxid , logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
//即使如果本節(jié)點(diǎn)的vote獲得了過半數(shù)participant的推舉,那么還需要通過recvqueue最多等待finalizeWait ms來確定本機(jī)的vote會(huì)不會(huì)被新來的vote更新
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
//如果等了finalizeWait這么長時(shí)間之后庆锦,沒有接收到任何的vote信息捅位,那么說明,大家都承認(rèn)本機(jī)的vote所推舉的節(jié)點(diǎn)為Leader節(jié)點(diǎn)
//根據(jù)proposedLeader和本機(jī)的sid來設(shè)置QuorumPeer的節(jié)點(diǎn)狀態(tài)
//如果proposedLeader == sid 那么設(shè)置本節(jié)點(diǎn)為Leader搂抒,反之绿渣,如果本節(jié)點(diǎn)是participant類型那么設(shè)置本節(jié)點(diǎn)狀態(tài)為Following,如果本節(jié)點(diǎn)狀態(tài)是Observer類型那么設(shè)置本節(jié)點(diǎn)狀態(tài)為Observing
setPeerState(proposedLeader, voteSet);
//生成最終代表Leader節(jié)點(diǎn)信息的vote
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
//leaveInstance 用來清空recvqueue燕耿,表示本輪選舉結(jié)束
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
//如果接受的vote的state是observing 那么什么都不做
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//這里有一個(gè)問題,就是什么情況下節(jié)點(diǎn)接受到的vote的狀態(tài)會(huì)是following或者leading姜胖,
//換句話說就是集群中的Leader已經(jīng)選舉出來了誉帅。
//比如當(dāng)一個(gè)集群中新加入了一個(gè)節(jié)點(diǎn),那么在這種情況下右莱,新節(jié)點(diǎn)就會(huì)得到別的服務(wù)節(jié)點(diǎn)的vote蚜锨,這個(gè)vote就是following或者leading的:這個(gè)地方和WorkerReceiver的工作機(jī)制有關(guān)系
//如果接受到的vote的狀態(tài)是Leading或者following,
if (n.electionEpoch == logicalclock.get()) {
//如果是同一輪選舉慢蜓,那么直接把vote加入recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
//通過voteSet去判斷是不是有過半數(shù)的participant推舉當(dāng)前vote.leader,同時(shí)還要求Leader服務(wù)器也把自己的vote發(fā)送給本節(jié)點(diǎn)了
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
*/
//如果不是同一輪選舉亚再,那么把獲得的vote信息加入outofelection,下面就是通過outofelection來找出Leader節(jié)點(diǎn)
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
對(duì)上述代碼邏輯可以使用下圖去描述
上面就是QuorumPeer線程的選主的工作邏輯
接下來我們看下其中的一些細(xì)節(jié)晨抡,這些細(xì)節(jié)會(huì)關(guān)聯(lián)到我前面提到的其他線程
sendNotifications
當(dāng)服務(wù)節(jié)點(diǎn)剛啟動(dòng)或者接受到別的節(jié)點(diǎn)發(fā)送來的r_vote來更新自己的proposal的時(shí)候都需要通過sendNotification方法將自己推薦的Leader信息發(fā)送給別的participant氛悬,我們分析下sendNotifications的源碼
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//把節(jié)點(diǎn)proposal的Leader信息封裝成ToSend對(duì)象然后加入到sendqueue中
ToSend notmsg = new ToSend(
ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch,
qv.toString().getBytes());
LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(logicalclock.get()),
sid,
self.getId(),
Long.toHexString(proposedEpoch));
sendqueue.offer(notmsg);
}
}
WorkerSender.run
我看看下消費(fèi)sendqueue隊(duì)列的WorkerSend線程的run方法
public void run() {
while (!stop) {
try {
//從sendqueue取出ToSend消息然后交給process處理
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if (m == null) {
continue;
}
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
WorkerSender.process
void process(ToSend m) {
//把toSend轉(zhuǎn)化成ByteBuffer對(duì)象
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
//通過QuorumCnxManager把requestBuffer發(fā)送給指定的participant
manager.toSend(m.sid, requestBuffer);
}
QuorumCnxManager.toSend
我們分析下participant連接管理器toSend方法發(fā)生了什么
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
//如果投票信息是發(fā)送給自己的那么直接放入recvQueue中
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
//queueSendMap:ConcurrentHashMap類型,是本節(jié)點(diǎn)保存發(fā)送消息到其他節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
//把本次發(fā)送給sid所代表的的節(jié)點(diǎn)投票信息保存到blockingQueue中
addToSendQueue(bq, b);
//建立本節(jié)點(diǎn)到sid節(jié)點(diǎn)的socket連接
connectOne(sid);
}
}
Tpis
在講解connectOne方法之前我們先講解下zookeeper投票節(jié)點(diǎn)直接的網(wǎng)絡(luò)連接拓?fù)洌?br> 下圖描述的是三個(gè)節(jié)點(diǎn)建立的網(wǎng)絡(luò)連接拓?fù)涫疽鈭D
每個(gè)節(jié)點(diǎn)都會(huì)和別的節(jié)點(diǎn)建立連接耘柱,zookeeper對(duì)于連接上的輸入和輸出投票消息分別使用SendWorker和ReceiveWorker來處理如捅,他們都是線程類。因?yàn)槿我鈨蓚€(gè)節(jié)點(diǎn)之間都需要建立連接调煎,為什么防止高效穩(wěn)定的無浪費(fèi)的建立起這些連接镜遣,zookeeper對(duì)于連接的建立創(chuàng)建了如下的一個(gè)約束:
值允許sid較大的機(jī)器去主動(dòng)建立到sid較小的機(jī)器:舉個(gè)?? : sid為1 和sid為2的兩個(gè)機(jī)器建立網(wǎng)絡(luò)連接
如果sid=1的服務(wù)器主動(dòng)發(fā)起向sid=2的服務(wù)器socket連接建立,該連接是無法建立起來的士袄,底層socket建立之后悲关,zookeeper會(huì)檢查本機(jī)的sid和遠(yuǎn)程連接服務(wù)器的sid谎僻,如果發(fā)現(xiàn)自己的sid比較小那么會(huì)主動(dòng)關(guān)閉socket連接。如果sid=2的服務(wù)器建立到sid=1的服務(wù)器socket連接寓辱,那么可以建立成功
QuorumCnxManager.connectOne
connectOne方法就是完成建立我們上面連接拓?fù)鋱D示意的結(jié)果
synchronized void connectOne(long sid) {
//senderWorkerMap用來存放每個(gè)sid對(duì)應(yīng)的SendWorker
if (senderWorkerMap.get(sid) != null) {
//如果sid對(duì)應(yīng)的SendWorker已經(jīng)存在(做一下多地址的檢查)那么直接返回
LOG.debug("There is a connection already for server {}", sid);
if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
//如果本節(jié)點(diǎn)到sid對(duì)應(yīng)的服務(wù)器還沒有建立socket連接艘绍,那么通過connectOne建立連接
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
return;
}
}
if (lastSeenQV != null
&& lastProposedView.containsKey(sid)
&& (!knownId
|| (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
return;
}
}
if (!knownId) {
LOG.warn("Invalid server id: {} ", sid);
}
}
}
上面的connectOne(sid,address)會(huì)繼續(xù)調(diào)用initiateConnectionAsync()方法,
QuorumCnxManager.initiateConnectionAsync
initiateConnectionAsync方法就是把建立連接的任務(wù)封存成QuorumConnectionReqThread然后異步完成
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {
if (!inprogressConnections.add(sid)) {
// simply return as there is a connection request to
// server 'sid' already in progress.
LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);
return true;
}
try {
connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
// Imp: Safer side catching all type of exceptions and remove 'sid'
// from inprogress connections. This is to avoid blocking further
// connection requests from this 'sid' in case of errors.
inprogressConnections.remove(sid);
LOG.error("Exception while submitting quorum connection request", e);
return false;
}
return true;
}
QuorumConnectionReqThread
這是一個(gè)線程類主要負(fù)責(zé)完成到指定服務(wù)器的socket連接
我們看下它的run方法調(diào)用的initiateConnection的實(shí)現(xiàn)
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
Socket sock = null;
try {
LOG.debug("Opening channel to server {}", sid);
if (self.isSslQuorum()) {
sock = self.getX509Util().createSSLSocket();
} else {
//通過工廠方式創(chuàng)建socket
sock = SOCKET_FACTORY.get();
}
setSockOpts(sock);
//建立到遠(yuǎn)程服務(wù)器的連接
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
LOG.info("SSL handshake complete with {} - {} - {}",
sslSock.getRemoteSocketAddress(),
sslSock.getSession().getProtocol(),
sslSock.getSession().getCipherSuite());
}
LOG.debug("Connected to server {} using election address: {}:{}",
sid, sock.getInetAddress(), sock.getPort());
} catch (X509Exception e) {
LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return;
} catch (UnresolvedAddressException | IOException e) {
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return;
}
try {
//這個(gè)方法我們?cè)谙旅娣治鱿?
startConnection(sock, sid);
} catch (IOException e) {
LOG.error(
"Exception while connecting, id: {}, addr: {}, closing learner connection",
sid,
sock.getRemoteSocketAddress(),
e);
closeSocket(sock);
}
}
QuorumConnectionReqThread.startConnection
startConnection完成了上面提到的連接建立的約束條件檢查讶舰,創(chuàng)建對(duì)應(yīng)的SendWorker和ReceiveWorker線程對(duì)象
private boolean startConnection(Socket sock, Long sid) throws IOException {
//數(shù)據(jù)輸出流
DataOutputStream dout = null;
//數(shù)據(jù)輸入流
DataInputStream din = null;
LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
//封裝數(shù)據(jù)輸出流
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// First sending the protocol version (in other words - message type).
// For backward compatibility reasons we stick to the old protocol version, unless the MultiAddress
// feature is enabled. During rolling upgrade, we must make sure that all the servers can
// understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720
//下面是建立到別的服務(wù)節(jié)點(diǎn)會(huì)話發(fā)送的一些基礎(chǔ)數(shù)據(jù)
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
//發(fā)送版本號(hào)
dout.writeLong(protocolVersion);
//發(fā)送本機(jī)的sid
dout.writeLong(self.getId());
// now we send our election address. For the new protocol version, we can send multiple addresses.
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
//創(chuàng)建數(shù)據(jù)輸入流
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
//如果有配置了服務(wù)器認(rèn)證鞍盗,那么對(duì)遠(yuǎn)端的服務(wù)器做認(rèn)證
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
//這個(gè)地方就是上面提到的 建立socket連接的約束條件檢查點(diǎn)
LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.get
//如果sid>self.sid那么關(guān)閉socket連接
closeSocket(sock);
// Otherwise proceed with the connection
} else {
LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getI
//根據(jù)sid和建立的socket建立SendWorker
SendWorker sw = new SendWorker(sock, sid);
//根據(jù)sid,socket和輸入信息流建立RecvWorker
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
//SendWorker持有RecvWorker的引用
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
//把SendWorker加入到 senderWorkerMap中
senderWorkerMap.put(sid, sw);
//queueSendMap初始化sid對(duì)應(yīng)的數(shù)據(jù)發(fā)送隊(duì)列
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
//分別啟動(dòng)SendWorker和ReceiveWorker
sw.start();
rw.start();
return true;
}
return false;
}
SendWorker
我們分析下SendWorker是如何工作的
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
//從queueSendMap根據(jù)sid獲取本SendWorker對(duì)應(yīng)的消息發(fā)送隊(duì)列
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
//在第一次運(yùn)行的時(shí)候如果發(fā)現(xiàn)bq是null或者bq是空那么直接把lastMessageSent中存儲(chǔ)的信息發(fā)送出去,當(dāng)然前提是lastMessageSent中有數(shù)據(jù)跳昼,
//SendWorker每次都會(huì)把最近發(fā)送的數(shù)據(jù)存放在lastMessageSent中
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid={}", sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
try {
//這里才是主循環(huán)般甲,會(huì)一直嘗試從自己的投票消息隊(duì)列中獲取投票消息然后發(fā)送出去
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for server {}", sid);
break;
}
if (b != null) {
//把最新的投票消息存儲(chǔ)到lastMessageSent中
lastMessageSent.put(sid, b);
//通過底層socket把消息發(fā)送出去
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue", e);
}
}
} catch (Exception e) {
LOG.warn(
"Exception when using channel: for id {} my id = {}",
sid ,
QuorumCnxManager.this.mySid,
e);
}
this.finish();
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
}
ReceiveWorker
分析完SendWorker的run方法,我們分析下ReceiveWorker的run方法
public void run() {
threadCnt.incrementAndGet();
try {
LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
//下面是循環(huán)從數(shù)據(jù)流中讀取消息
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
//在傳遞投票消息時(shí)鹅颊,zookeeper采用變長消息格式敷存,所以每次先讀取消息的長度
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
final byte[] msgArray = new byte[length];
//根據(jù)消息的長度讀取整個(gè)消息體的數(shù)據(jù)
din.readFully(msgArray, 0, length);
//把讀取的到的消息封裝成message然后放入到RecvQueue中,等待處理
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
LOG.warn(
"Connection broken for id {}, my id = {}",
sid,
QuorumCnxManager.this.mySid,
e);
} finally {
LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
sw.finish();
closeSocket(sock);
}
}
}
WorkerReceiver
通過上面的分析堪伍,我們可以看到投票消息會(huì)通過ReceiveWorker讀取封裝之后放入到RecvQueue中锚烦,那么接下來就是看下WorkerReceiver是如何消費(fèi)RecvQueue中的數(shù)據(jù)了,我們分析下WorkerReceiver的run方法,
這個(gè)方法很長帝雇,請(qǐng)耐心看完
public void run() {
Message response;
//主循環(huán)
while (!stop) {
// Sleeps on receive
try {
//從RecvQueue中嘗試獲取投票信息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if (response == null) {
//如果為空那么 繼續(xù)
continue;
}
//根據(jù)消息的大小會(huì)做下面一系列的合法性驗(yàn)證
final int capacity = response.buffer.capacity();
// The current protocol and two previous generations all send at least 28 bytes
if (capacity < 28) {
LOG.error("Got a short response from server {}: {}", response.sid, capacity);
continue;
}
// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (capacity == 28);
// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (capacity == 40);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
//從消息中抽取信息涮俄,用這些信息來生成notification
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
QuorumVerifier rqv = null;
try {
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/
version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}
// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();
// we want to avoid errors caused by the allocation of a byte array with negative length
// (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
if (configLength < 0 || configLength > capacity) {
throw new IOException(String.format("Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
response.sid, capacity, version, configLength));
}
byte[] b = new byte[configLength];
//獲取config的數(shù)據(jù)
response.buffer.get(b);
synchronized (self) {
try {
//根據(jù)config來生成QuorumVerifier
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();
break;
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
}
} catch (IOException | ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
}
} catch (BufferUnderflowException | IOException e) {
LOG.warn("Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
response.sid, capacity, e);
continue;
}
/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
//如果發(fā)送的投票信息的服務(wù)器sid不是合法的投票者,那么直接恢復(fù)信息
if (!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
// Receive new message
LOG.debug("Receive new notification message. My id = {}", self.getId());
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
//使用Message中抽取出來的數(shù)據(jù)給Notification屬性賦值
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
LOG.info(
"Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, "
+ "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}",
self.getPeerState(),
n.sid,
n.state,
n.leader,
Long.toHexString(n.electionEpoch),
Long.toHexString(n.peerEpoch),
Long.toHexString(n.zxid),
Long.toHexString(n.version),
(n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0"));
/*
* If this server is looking, then send proposed leader
*/
//如果本節(jié)點(diǎn)是在Looking狀態(tài)尸闸,那么把生成的Notification加入到recvqueue中
if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if ((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())) {
//如果接受到sid的投票信息的輪次小于本機(jī)進(jìn)行的投票輪次彻亲,那么把本機(jī)的vote信息發(fā)送給對(duì)應(yīng)的sid
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
//如果本機(jī)沒有處在Looking的狀態(tài),也就是說主節(jié)點(diǎn)已經(jīng)選舉出來了吮廉,那么
Vote current = self.getCurrentVote();
if (ackstate == QuorumPeer.ServerState.LOOKING) {
//下面是判斷Leader節(jié)點(diǎn)的合法性
if (self.leader != null) {
if (leadingVoteSet != null) {
self.leader.setLeadingVoteSet(leadingVoteSet);
leadingVoteSet = null;
}
self.leader.reportLookingSid(response.sid);
}
LOG.debug(
"Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
//把主節(jié)點(diǎn)信息發(fā)送給對(duì)應(yīng)的sid服務(wù)器
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message", e);
}
}
LOG.info("WorkerReceiver is down");
}
}
上面就是WorkerReceiver的工作流程苞尝,WorkerReceiver會(huì)把投票信息處理之后形成Notification加入到recevqueue中,QuorumPeer會(huì)從recevqueue去獲取notification處理宦芦,這個(gè)處理邏輯在上面 我們已經(jīng)分析過了宙址。
End
自此我們完成了zookeeper主節(jié)點(diǎn)選舉流程的源碼分析