Leader選舉是保證分布式數(shù)據(jù)一致性的關鍵所在淘讥。當Zookeeper集群中的一臺服務器出現(xiàn)以下兩種情況之一時老厌,需要進入Leader選舉边翼。
- 服務器初始化啟動腮敌。
- 服務器運行期間無法和Leader保持連接征懈。
服務器啟動時期的Leader選舉
若進行Leader選舉石咬,則至少需要兩臺機器,這里選取3臺機器組成的服務器集群為例卖哎。在集群初始化階段鬼悠,當有一臺服務器Server1啟動時,其單獨無法進行和完成Leader選舉亏娜,當?shù)诙_服務器Server2啟動時焕窝,此時兩臺機器可以相互通信,每臺機器都試圖找到Leader维贺,于是進入Leader選舉過程它掂。選舉過程如下
每個Server發(fā)出一個投票: 由于是初始情況,Server1和Server2都會將自己作為Leader服務器來進行投票溯泣,每次投票會包含所推舉的服務器的myid和ZXID虐秋,使用(myid, ZXID)來表示,此時Server1的投票為(1, 0)垃沦,Server2的投票為(2, 0)客给,然后各自將這個投票發(fā)給集群中其他機器。
接收來自各個服務器的投票: 集群的每個服務器收到投票后肢簿,首先判斷該投票的有效性靶剑,如檢查是否是本輪投票、是否來自LOOKING狀態(tài)的服務器译仗。
-
處理投票 針對每一個投票抬虽,服務器都需要將別人的投票和自己的投票進行PK,PK規(guī)則如下:
- 優(yōu)先檢查ZXID纵菌。ZXID比較大的服務器優(yōu)先作為Leader阐污。
- 如果ZXID相同,那么就比較myid咱圆。myid較大的服務器作為Leader服務器笛辟。
對于Server1而言,它的投票是(1, 0)序苏,接收Server2的投票為(2, 0)手幢,首先會比較兩者的ZXID,均為0忱详,再比較myid围来,此時Server2的myid最大,于是更新自己的投票為(2, 0),然后重新投票监透,對于Server2而言桶错,其無須更新自己的投票,只是再次向集群中所有機器發(fā)出上一次投票信息即可胀蛮。
- 統(tǒng)計投票:每次投票后院刁,服務器都會統(tǒng)計投票信息,判斷是否已經有過半機器接受到相同的投票信息粪狼,對于Server1退腥、Server2而言,都統(tǒng)計出集群中已經有兩臺機器接受了(2, 0)的投票信息再榄,此時便認為已經選出了Leader狡刘。
- 改變服務器狀態(tài): 一旦確定了Leader,每個服務器就會更新自己的狀態(tài)不跟,如果是Follower颓帝,那么就變更為FOLLOWING,如果是Leader窝革,就變更為LEADING购城。
還有一種情形是服務器運行時期的Leader選舉
在Zookeeper運行期間,Leader與非Leader服務器各司其職虐译,即便當有非Leader服務器宕機或新加入瘪板,此時也不會影響Leader,但是一旦Leader服務器掛了漆诽,那么整個集群將暫停對外服務侮攀,進入新一輪Leader選舉,其過程和啟動時期的Leader選舉過程基本一致厢拭。假設正在運行的有Server1兰英、Server2、Server3三臺服務器供鸠,當前Leader是Server2畦贸,若某一時刻Leader掛了,此時便開始Leader選舉楞捂。選舉過程如下
- 變更狀態(tài)薄坏。Leader掛后,余下的非Observer服務器都會講自己的服務器狀態(tài)變更為LOOKING寨闹,然后開始進入Leader選舉過程胶坠。
- 每個Server會發(fā)出一個投票。在運行期間繁堡,每個服務器上的ZXID可能不同沈善,此時假定Server1的ZXID為123乡数,Server3的ZXID為122;在第一輪投票中矮瘟,Server1和Server3都會投自己瞳脓,產生投票(1, 123),(3, 122)澈侠,然后各自將投票發(fā)送給集群中所有機器。
- 接收來自各個服務器的投票埋酬。與啟動時過程相同哨啃。
- 處理投票。與啟動時過程相同写妥,此時拳球,Server1將會成為Leader。
- 統(tǒng)計投票珍特。與啟動時過程相同祝峻。
- 改變服務器的狀態(tài)。與啟動時過程相同扎筒。
Leader選舉算法分析
在3.4.0后的Zookeeper的版本只保留了TCP版本的FastLeaderElection選舉算法莱找。當一臺機器進入Leader選舉時,當前集群可能會處于以下兩種狀態(tài)
- 集群中已經存在Leader嗜桌。
- 集群中不存在Leader奥溺。
對于集群中已經存在Leader而言,此種情況一般都是某臺機器啟動得較晚骨宠,在其啟動之前浮定,集群已經在正常工作,對這種情況层亿,該機器試圖去選舉Leader時桦卒,會被告知當前服務器的Leader信息,對于該機器而言匿又,僅僅需要和Leader機器建立起連接方灾,并進行狀態(tài)同步即可。而在集群中不存在Leader情況下則會相對復雜琳省,其步驟如下
第一次投票迎吵。無論哪種導致進行Leader選舉,集群的所有機器都處于試圖選舉出一個Leader的狀態(tài)针贬,即LOOKING狀態(tài)击费,LOOKING機器會向所有其他機器發(fā)送消息,該消息稱為投票桦他。投票中包含了SID(服務器的唯一標識)和ZXID(事務ID)蔫巩,(SID, ZXID)形式來標識一次投票信息谆棱。假定Zookeeper由5臺機器組成,SID分別為1圆仔、2垃瞧、3、4坪郭、5个从,ZXID分別為9、9歪沃、9嗦锐、8、8沪曙,并且此時SID為2的機器是Leader機器奕污,某一時刻,1液走、2所在機器出現(xiàn)故障碳默,因此集群開始進行Leader選舉。在第一次投票時缘眶,每臺機器都會將自己作為投票對象嘱根,于是SID為3、4磅崭、5的機器投票情況分別為(3, 9)儿子,(4, 8), (5, 8)砸喻。
變更投票柔逼。每臺機器發(fā)出投票后,也會收到其他機器的投票割岛,每臺機器會根據(jù)一定規(guī)則來處理收到的其他機器的投票愉适,并以此來決定是否需要變更自己的投票,這個規(guī)則也是整個Leader選舉算法的核心所在癣漆,其中術語描述如下
vote_sid:接收到的投票中所推舉Leader服務器的SID维咸。
vote_zxid:接收到的投票中所推舉Leader服務器的ZXID。
self_sid:當前服務器自己的SID惠爽。
self_zxid:當前服務器自己的ZXID癌蓖。
每次對收到的投票的處理,都是對(vote_sid, vote_zxid)和(self_sid, self_zxid)對比的過程婚肆。
規(guī)則一:如果vote_zxid大于self_zxid租副,就認可當前收到的投票,并再次將該投票發(fā)送出去较性。
規(guī)則二:如果vote_zxid小于self_zxid用僧,那么堅持自己的投票结胀,不做任何變更。
規(guī)則三:如果vote_zxid等于self_zxid责循,那么就對比兩者的SID糟港,如果vote_sid大于self_sid,那么就認可當前收到的投票院仿,并再次將該投票發(fā)送出去秸抚。
規(guī)則四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid歹垫,那么堅持自己的投票耸别,不做任何變更。
結合上面規(guī)則县钥,給出下面的集群變更過程。
- 確定Leader慈迈。經過第二輪投票后若贮,集群中的每臺機器都會再次接收到其他機器的投票,然后開始統(tǒng)計投票痒留,如果一臺機器收到了超過半數(shù)的相同投票谴麦,那么這個投票對應的SID機器即為Leader。此時Server3將成為Leader伸头。
由上面規(guī)則可知匾效,通常那臺服務器上的數(shù)據(jù)越新(ZXID會越大),其成為Leader的可能性越大恤磷,也就越能夠保證數(shù)據(jù)的恢復面哼。如果ZXID相同,則SID越大機會越大扫步。
Leader選舉實現(xiàn)細節(jié)
- 服務器狀態(tài): 服務器具有四種狀態(tài)魔策,分別是LOOKING、FOLLOWING河胎、LEADING闯袒、OBSERVING。
LOOKING:尋找Leader狀態(tài)游岳。當服務器處于該狀態(tài)時政敢,它會認為當前集群中沒有Leader,因此需要進入Leader選舉狀態(tài)胚迫。
FOLLOWING:跟隨者狀態(tài)喷户。表明當前服務器角色是Follower。
LEADING:領導者狀態(tài)晌区。表明當前服務器角色是Leader摩骨。
OBSERVING:觀察者狀態(tài)通贞。表明當前服務器角色是Observer。
- 投票數(shù)據(jù)結構
每個投票中包含了兩個最基本的信息恼五,所推舉服務器的SID和ZXID昌罩,投票(Vote)在Zookeeper中包含字段如下
id:被推舉的Leader的SID。
zxid:被推舉的Leader事務ID灾馒。
electionEpoch:邏輯時鐘茎用,用來判斷多個投票是否在同一輪選舉周期中,該值在服務端是一個自增序列睬罗,每次進入新一輪的投票后轨功,都會對該值進行加1操作。
peerEpoch:被推舉的Leader的epoch容达。
state:當前服務器的狀態(tài)古涧。
- QuorumCnxManager:網(wǎng)絡I/O
每臺服務器在啟動的過程中,會啟動一個QuorumPeerManager花盐,負責各臺服務器之間的底層Leader選舉過程中的網(wǎng)絡通信羡滑。
- 消息隊列。QuorumCnxManager內部維護了一系列的隊列算芯,用來保存接收到的柒昏、待發(fā)送的消息以及消息的發(fā)送器,除接收隊列以外熙揍,其他隊列都按照SID分組形成隊列集合职祷,如一個集群中除了自身還有3臺機器,那么就會為這3臺機器分別創(chuàng)建一個發(fā)送隊列届囚,互不干擾有梆。
recvQueue:消息接收隊列,用于存放那些從其他服務器接收到的消息奖亚。
queueSendMap:消息發(fā)送隊列淳梦,用于保存那些待發(fā)送的消息,按照SID進行分組昔字。
senderWorkerMap:發(fā)送器集合爆袍,每個SenderWorker消息發(fā)送器,都對應一臺遠程Zookeeper服務器作郭,負責消息的發(fā)送陨囊,也按照SID進行分組。
lastMessageSent:最近發(fā)送過的消息夹攒,為每個SID保留最近發(fā)送過的一個消息蜘醋。
建立連接。為了能夠相互投票咏尝,Zookeeper集群中的所有機器都需要兩兩建立起網(wǎng)絡連接压语。QuorumCnxManager在啟動時會創(chuàng)建一個ServerSocket來監(jiān)聽Leader選舉的通信端口(默認為3888)啸罢。開啟監(jiān)聽后,Zookeeper能夠不斷地接收到來自其他服務器的創(chuàng)建連接請求胎食,在接收到其他服務器的TCP連接請求時扰才,會進行處理。為了避免兩臺機器之間重復地創(chuàng)建TCP連接厕怜,Zookeeper只允許SID大的服務器主動和其他機器建立連接衩匣,否則斷開連接。在接收到創(chuàng)建連接請求后粥航,服務器通過對比自己和遠程服務器的SID值來判斷是否接收連接請求琅捏,如果當前服務器發(fā)現(xiàn)自己的SID更大,那么會斷開當前連接递雀,然后自己主動和遠程服務器建立連接柄延。一旦連接建立,就會根據(jù)遠程服務器的SID來創(chuàng)建相應的消息發(fā)送器SendWorker和消息接收器RecvWorker缀程,并啟動拦焚。
消息接收與發(fā)送。消息接收:由消息接收器RecvWorker負責杠输,由于Zookeeper為每個遠程服務器都分配一個單獨的RecvWorker,因此秕衙,每個RecvWorker只需要不斷地從這個TCP連接中讀取消息蠢甲,并將其保存到recvQueue隊列中。消息發(fā)送:由于Zookeeper為每個遠程服務器都分配一個單獨的SendWorker据忘,因此鹦牛,每個SendWorker只需要不斷地從對應的消息發(fā)送隊列中獲取出一個消息發(fā)送即可,同時將這個消息放入lastMessageSent中勇吊。在SendWorker中曼追,一旦Zookeeper發(fā)現(xiàn)針對當前服務器的消息發(fā)送隊列為空,那么此時需要從lastMessageSent中取出一個最近發(fā)送過的消息來進行再次發(fā)送汉规,這是為了解決接收方在消息接收前或者接收到消息后服務器掛了礼殊,導致消息尚未被正確處理。同時针史,Zookeeper能夠保證接收方在處理消息時晶伦,會對重復消息進行正確的處理。
- FastLeaderElection:選舉算法核心
外部投票:特指其他服務器發(fā)來的投票啄枕。
內部投票:服務器自身當前的投票婚陪。
選舉輪次:Zookeeper服務器Leader選舉的輪次,即logicalclock频祝。
PK:對內部投票和外部投票進行對比來確定是否需要變更內部投票泌参。
- 選票管理
sendqueue:選票發(fā)送隊列脆淹,用于保存待發(fā)送的選票。
recvqueue:選票接收隊列沽一,用于保存接收到的外部投票盖溺。
WorkerReceiver:選票接收器。其會不斷地從QuorumCnxManager中獲取其他服務器發(fā)來的選舉消息锯玛,并將其轉換成一個選票咐柜,然后保存到recvqueue中,在選票接收過程中攘残,如果發(fā)現(xiàn)該外部選票的選舉輪次小于當前服務器的拙友,那么忽略該外部投票,同時立即發(fā)送自己的內部投票歼郭。
WorkerSender:選票發(fā)送器遗契,不斷地從sendqueue中獲取待發(fā)送的選票,并將其傳遞到底層QuorumCnxManager中病曾。
算法核心
上圖展示了FastLeaderElection模塊是如何與底層網(wǎng)絡I/O進行交互的牍蜂。Leader選舉的基本流程如下
自增選舉輪次。Zookeeper規(guī)定所有有效的投票都必須在同一輪次中泰涂,在開始新一輪投票時鲫竞,會首先對logicalclock進行自增操作。
初始化選票逼蒙。在開始進行新一輪投票之前从绘,每個服務器都會初始化自身的選票,并且在初始化階段是牢,每臺服務器都會將自己推舉為Leader僵井。
發(fā)送初始化選票。完成選票的初始化后驳棱,服務器就會發(fā)起第一次投票批什。Zookeeper會將剛剛初始化好的選票放入sendqueue中,由發(fā)送器WorkerSender負責發(fā)送出去社搅。
接收外部投票驻债。每臺服務器會不斷地從recvqueue隊列中獲取外部選票。如果服務器發(fā)現(xiàn)無法獲取到任何外部投票形葬,那么就會立即確認自己是否和集群中其他服務器保持著有效的連接却汉,如果沒有連接,則馬上建立連接荷并,如果已經建立了連接合砂,則再次發(fā)送自己當前的內部投票。
判斷選舉輪次。在發(fā)送完初始化選票之后,接著開始處理外部投票。在處理外部投票時秒紧,會根據(jù)選舉輪次來進行不同的處理。
· 外部投票的選舉輪次大于內部投票凛剥。若服務器自身的選舉輪次落后于該外部投票對應服務器的選舉輪次,那么就會立即更新自己的選舉輪次(logicalclock)轻姿,并且清空所有已經收到的投票犁珠,然后使用初始化的投票來進行PK以確定是否變更內部投票。最終再將內部投票發(fā)送出去互亮。
· 外部投票的選舉輪次小于內部投票犁享。若服務器接收的外選票的選舉輪次落后于自身的選舉輪次,那么Zookeeper就會直接忽略該外部投票豹休,不做任何處理炊昆,并返回步驟4。
· 外部投票的選舉輪次等于內部投票威根。此時可以開始進行選票PK凤巨。
- 選票PK。在進行選票PK時洛搀,符合任意一個條件就需要變更投票敢茁。
· 若外部投票中推舉的Leader服務器的選舉輪次大于內部投票,那么需要變更投票留美。
· 若選舉輪次一致卷要,那么就對比兩者的ZXID,若外部投票的ZXID大独榴,那么需要變更投票。
· 若兩者的ZXID一致奕枝,那么就對比兩者的SID棺榔,若外部投票的SID大,那么就需要變更投票隘道。
變更投票症歇。經過PK后,若確定了外部投票優(yōu)于內部投票谭梗,那么就變更投票忘晤,即使用外部投票的選票信息來覆蓋內部投票,變更完成后激捏,再次將這個變更后的內部投票發(fā)送出去设塔。
選票歸檔。無論是否變更了投票远舅,都會將剛剛收到的那份外部投票放入選票集合recvset中進行歸檔闰蛔。recvset用于記錄當前服務器在本輪次的Leader選舉中收到的所有外部投票(按照服務隊的SID區(qū)別痕钢,如{(1, vote1), (2, vote2)...})。
統(tǒng)計投票序六。完成選票歸檔后任连,就可以開始統(tǒng)計投票,統(tǒng)計投票是為了統(tǒng)計集群中是否已經有過半的服務器認可了當前的內部投票例诀,如果確定已經有過半服務器認可了該投票随抠,則終止投票。否則重新選舉繁涂。
更新服務器狀態(tài)拱她。若已經確定可以終止投票,那么就開始更新服務器狀態(tài)爆土,服務器首選判斷當前被過半服務器認可的投票所對應的Leader服務器是否是自己椭懊,若是自己,則將自己的服務器狀態(tài)更新為LEADING步势,若不是氧猬,則根據(jù)具體情況來確定自己是FOLLOWING或是OBSERVING。
以上10個步驟就是FastLeaderElection的核心坏瘩,選舉過程可能會經過幾輪循環(huán)盅抚,直到有Leader選舉產生。
源碼分析
每臺服務器在啟動的過程中倔矾,會啟動一個 QuorumPeerManager妄均,負責各臺服務器之間的底層 Leader 選舉過程中的網(wǎng)絡通信。
- 初始化 QuorumCnxManager
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
在 createElectionAlgorithm 會啟動 QuorumCnxManager
public QuorumCnxManager(QuorumPeer self) {
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");
}
可以看到QuorumCnxManager 內部維護了一系列的隊列哪自,用來保存接收到的丰包、待發(fā)送的消息以及消息的發(fā)送器,除接收隊列以外壤巷,其他隊列都按照 SID 分組形成隊列集合邑彪。
recvQueue 消息接收隊列,用于存放那些從其他服務器接收到的消息胧华。
queueSendMap 消息發(fā)送隊列寄症,用于保存那些待發(fā)送的消息,按照 SID 進行分組矩动。
senderWorkerMap 發(fā)送器集合有巧,每個 SenderWorker 消息發(fā)送器,都對應一臺遠程 Zookeeper 服務器悲没,負責消息的發(fā)送篮迎,也按照 SID 進行分組。
lastMessageSent 最近發(fā)送過的消息,為每個 SID 保留最近發(fā)送過的一個消息柑潦。
Listener: 可以看到 Listener 初始化了一個 ServerSocket享言,默認端口為 3888 進行底層 Leader 選舉通信。
/**
* Thread to listen on some port
*/
public class Listener extends ZooKeeperThread {
volatile ServerSocket ss = null;
public Listener() {
// During startup of thread, thread name will be overridden to
// specific election address
super("ListenerThread");
}
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
為了避免兩臺機器之間重復地創(chuàng)建 TCP 連接渗鬼,Zookeeper 只允許 SID 大的服務器主動和其他機器建立連接览露,否則斷開連接。在接收到創(chuàng)建連接請求后譬胎,服務器通過對比自己和遠程服務器的 SID 值來判斷是否接收連接請求差牛,如果當前服務器發(fā)現(xiàn)自己的 SID 更大,那么會斷開當前連接堰乔,然后自己主動和遠程服務器建立連接偏化。一旦連接建立,就會根據(jù)遠程服務器的 SID 來創(chuàng)建相應的消息發(fā)送器 SendWorker 和消息接收器 RecvWorker镐侯,并啟動侦讨。每個 RecvWorker 只需要不斷地從這個 TCP 連接中讀取消息,并將其保存到 recvQueue 隊列中苟翻。每個 SendWorker 只需要不斷地從對應的消息發(fā)送隊列中獲取出一個消息發(fā)送即可韵卤,同時將這個消息放入 lastMessageSent 中。
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null;
try {
// Read server id
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
// next comes the #bytes in the remainder of the message
// note that 0 bytes is fine (old servers)
int num_remaining_bytes = din.readInt();
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
// do authenticating learner
LOG.debug("Authenticating learner server.id: {}", sid);
authServer.authenticate(sock, din);
//If wins the challenge, then close the new connection.
if (sid < this.mySid) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return;
}
}
FastLeaderElection選舉算法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
/**
* Constructor of class Messenger.
*
* @param manager Connection manager
*/
Messenger(QuorumCnxManager manager) {
// 1. WorkerSender 選票接收器崇猫,負責從 QuorumCnxManager 接收選票后保存到 recvqueue 中
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
// 2. WorkerReceiver 選票發(fā)送器沈条,負責從 sendqueue 中獲取待發(fā)送的選票并傳遞給 QuorumCnxManager
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
在 FastLeaderElection 中有幾個屬性需要我們重點關注一下:
sendqueue 選票發(fā)送隊列,用于保存待發(fā)送的選票诅炉。
recvqueue 選票接收隊列蜡歹,用于保存接收到的外部投票。
WorkerReceiver 選票接收器涕烧。其會不斷地從 QuorumCnxManager 中獲取其他服務器發(fā)來的選舉消息月而,并將其轉換成一個選票,然后保存到 recvqueue 中议纯,在選票接收過程中父款,如果發(fā)現(xiàn)該外部選票的選舉輪次小于當前服務器的,那么忽略該外部投票痹扇,同時立即發(fā)送自己的內部投票。
WorkerSender 選票發(fā)送器溯香,不斷地從 sendqueue 中獲取待發(fā)送的選票鲫构,并將其傳遞到底層 QuorumCnxManager 中。
lookForLeader
...
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
}
...
public Vote lookForLeader() throws InterruptedException {
// 省略...
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
// 1. 啟動時先投自己一票并廣播給其它服務器
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 2. 獲取其它服務器發(fā)送過來的選票
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
// 3. 如果沒有選票玫坛,則先判斷是否存在連接结笨,如存在則是先投自己一票,如沒則立即連接
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
// 省略...
}
// 4. 收到投票信息,根據(jù) LOOKING炕吸、OBSERVING伐憾、FOLLOWING、LEADING 分別處理
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
switch (n.state) {
// 5. LOOKING 時才會進行選舉
case LOOKING:
// 5.1 判斷投票是否過時赫模,如果自己過時就清除之前已經接收到的信息
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
// 重新發(fā)起投票树肃,PK 一下:如果收到的票據(jù)大則更新票據(jù),否則仍投自己一票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// 更新票據(jù)
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
// 仍投自己一票
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
// 5.2 收到的票據(jù)過時則直接忽略
} else if (n.electionEpoch < logicalclock.get()) {
break;
// 5.3 epoch 相等則要 PK
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 5.4 統(tǒng)計誰的投票超過半數(shù)瀑罗,就成為 Leader
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// 5.5 再等一會兒(200ms)胸嘴,看是否有新的投票
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 5.6 如果沒有發(fā)生新的投票,則結束選舉過程則結束選舉斩祭,修改狀態(tài)為 LEADING
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
// 6. OBSERVING 不能與投票
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
// 7. FOLLOWING劣像、LEADING 說明已存在 Leader。
// 可能在同一輪選舉中摧玫,也可能是之前就存在的 Leader 耳奕,則不在同一輪選舉中
case FOLLOWING:
case LEADING:
// 7.1 在同一輪選舉中,則收集所有的選票放到 recvset 中
// 如有半數(shù)支持則更新狀態(tài)退出選舉
if(n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
// 7.2 如果收到的 logicalclock 與當前不相等诬像,那說明在另一個選舉中已經有了結果(Leader 已存在)
// 收集所有的選票到 outofelection 中屋群,如有半數(shù)支持則更新狀態(tài)退出選舉
outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
// 省略...
}
}
Leader 選舉有兩個函數(shù)需要重點關注一下,totalOrderPredicate() 對兩張選票進行 PK颅停,termPredicate() 判斷投票是否可以結束了谓晌。
totalOrderPredicate(PK 選票)
// id(sid) zxid(事務id) epoch(選舉輪數(shù),每更新一次 Leader 自增 1)
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch,
long curId, long curZxid, long curEpoch) {
/*
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
termPredicate(結束投票)
/ 票據(jù)占多數(shù)則結束選舉
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 將支持 vote 的票據(jù)放到 set 集合中(Set 可去重)
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
// self.getQuorumVerifier().containsQuorum(set)
return voteSet.hasAllQuorums();
}