說起zk,就會(huì)想到一個(gè)leader著洼,多個(gè)follower和observer這樣一種架構(gòu)樟遣,本文就是對(duì)zk選舉源碼的分析。本文分為兩部分身笤,第一部分是選舉流程豹悬,第二部分是選舉算法的核心邏輯:選舉流程一定要跟著這兩張圖看,選舉算法就是zk的選舉算法實(shí)現(xiàn)液荸。
首先看向這兩張選舉圖:
第一張圖解釋:
- QuorumPeer:負(fù)責(zé)選舉
- Messenger:則是選舉消息發(fā)送和接收的具體類(注意在ZooKeeper中瞻佛,提供了3種Leader的選舉算法,分別是LeaderElection、 UDP版本的FastLeaderElection伤柄、TCP版本的FastLeaderElection绊困,從3.4.0版本開始,ZooKeeper廢棄了前2種算法适刀,只保留了TCP版本的FastLeaderElection算法秤朗,所以這里的Messenger是FastLeaderElection.Messenger)
- WorkerReceiver和WorkerSender:Messenger中的接收和發(fā)送線程
- QuorumCnxManager:選舉信息交換的Socket框架,采用Netty框架負(fù)責(zé)底層Socket鏈接管理笔喉,提供Select在多個(gè)Socket之間切換取视,先到先得處理選舉交換
第二張圖我們跟著代碼做更詳細(xì)的解釋:
如果以下內(nèi)容有不清楚,看向這兩張圖即可一目了然常挚。
選舉流程
每臺(tái)服務(wù)器都會(huì)啟動(dòng)一個(gè)QuorumPeer進(jìn)程作谭,QuorumPeer負(fù)責(zé)選舉整個(gè)過程,這是一個(gè)線程類待侵,是在QuorumPeerMain類中啟動(dòng):
代碼啟動(dòng)順序:
QuorumPeerMain#main->QuorumPeerMain#initializeAndRun->QuorumPeerMain#runFromConfig(集群)/ZooKeeperServerMain#main(單機(jī))
我們看向集群方法丢早,QuorumPeerMain#runFromConfig在加載各種配置后會(huì)啟動(dòng)QuorumPeer線程,我們直接看向QuorumPeer的run方法秧倾。
//主線程怨酝,管理QuoRumPeer循環(huán)在FastLeadingElection,Leader那先,F(xiàn)ollower农猬,Observer之間切換
@Override
public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
//--初始化選舉過程中需要使用到的線程和隊(duì)列
startLeaderElection();
}
//--選舉邏輯入口
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
//6--啟動(dòng)Observer
//6--跟follower類似,注冊(cè)售淡,事務(wù)同步斤葱,調(diào)用processpacket
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING://6--Follwer啟動(dòng)首先要了鏈接到leader,同步寫事務(wù)歷史記錄揖闸,然后才啟動(dòng)zookeeperServer提供服務(wù)給客戶端
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
//6--注冊(cè)到Leader connectToLeader->registerWithLeader->syncWithLeader
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING://6--選舉完成后揍堕,Peer確認(rèn)自己是leader的身份,
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
//6--執(zhí)行l(wèi)eader真正的邏輯
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);
for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
}
}
QuorumPeer有4種工作模式汤纸,
- looking:選舉模式衩茸,啟動(dòng)fastleaderElection
- leading:領(lǐng)導(dǎo)者模式,啟動(dòng)leader
- following:跟隨者模式贮泞,啟動(dòng)follower
- observing:旁觀者模式楞慈,啟動(dòng)observer
我們先看向選舉模式調(diào)用方法順序:
QuorumPeer#startLeaderElection->QuorumPeer#createElectionAlgorithm
看向QuorumPeer#createElectionAlgorithm源碼:
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
//--初始化sendqueue和recvqueue兩個(gè)隊(duì)列
//--初始化QuorumCnxManager
//--初始化Messenger中WorkerSender線程和WorkerReceiver線程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//--啟動(dòng)Messenger中WorkerSender線程和WorkerReceiver線程
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
直接看向case 3:中的這段代碼:
//--初始化sendqueue和recvqueue兩個(gè)隊(duì)列
//--初始化QuorumCnxManager
//--初始化Messenger中WorkerSender線程和WorkerReceiver線程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//--啟動(dòng)Messenger中WorkerSender線程和WorkerReceiver線程
fle.start();
結(jié)合第二張圖來看這段代碼是非常清晰的,最后就是啟動(dòng)了Messenger中WorkerSender線程和WorkerReceiver線程.
FastLeaderElection.Messenger.WorkerSender#run
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
//--調(diào)用tosend()
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
方法調(diào)用順序:
FastLeaderElection.Messenger.WorkerSender#process->QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
//--消息發(fā)送給我自己啃擦,放進(jìn)recvQueue
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {//--否則放進(jìn)queueSendMap
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
//--放進(jìn)去queueSendMap
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
}
然后的方法調(diào)用順序:
QuorumCnxManager#connectOne->QuorumCnxManager#initiateConnection->QuorumCnxManager#startConnection
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = formatInetAddr(self.getElectionAddress());
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
所以直接看向QuorumCnxManager.SendWorker和QuorumCnxManager.RecvWorker的run方法囊蓝,這里才是真正發(fā)送消息和接收的地方,前面都是各路封裝令蛉。
QuorumCnxManager.SendWorker#run
//--將queueSendMap中的消息發(fā)送出去聚霜。
@Override
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
//5--獲取QueueSendMap,根據(jù)機(jī)器sid
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
//5--最后一條發(fā)送的消息
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
//--放入lasetMessageSent隊(duì)列
if(b != null){
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
//--關(guān)閉連接
this.finish();
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
}
其中send()
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
//--DataOutputStream-dout是Socket寫接口
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
再看向QuorumCnxManager.RecvWorker#run
//--接受消息
@Override
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
//--DataInputStream-din,Socket接口
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
//--添加到recvQueue隊(duì)列
//--刪除隊(duì)列頭部消息(如果內(nèi)存空間不夠)俯萎,添加到隊(duì)尾
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
closeSocket(sock);
}
}
}
其中addToRecvQueue()
//5--刪除頭部钞啸,保留尾部
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"recvQueue. Ignoring exception " + ne);
}
}
try {
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
接著再看向第二張圖架谎,這張圖真的很重要院塞。我們?cè)倏聪騀astLeaderElection钟鸵,這時(shí)我們看向FastLeaderElection.Messenger.WorkerReceiver#run
//5--FastLeaderElection的WorkerReceive線程將來自QuorumCnxManager中recvQueue隊(duì)列的消息組裝成Notification放入FastLeaderElection的recvqueue隊(duì)列
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
//5--QuorumCnxManager中recvQueue隊(duì)列的消息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: " + response.buffer.capacity());
continue;
}
// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (response.buffer.capacity() == 28);
// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (response.buffer.capacity() == 40);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/
version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}
QuorumVerifier rqv = null;
// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();
byte b[] = new byte[configLength];
response.buffer.get(b);
synchronized(self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}", self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();
break;
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
}
} catch (IOException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
} catch (ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
}
/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if(!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
// Receive new message
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
if(LOG.isInfoEnabled()){
printNotification(n);
}
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
//5---放入到recvqueue隊(duì)列中,待處理
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
//--對(duì)方也是Looking撇眯,判斷那方的Epoch和zxid大报嵌,大的成為新leader候選
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
//--當(dāng)前server不是looking,但是對(duì)方是looking熊榛,就發(fā)給自己認(rèn)為是leader的消息過去
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
}
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
//等會(huì)發(fā)給 發(fā)送消息隊(duì)列的(queueSendMap) 消息隊(duì)列
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
把關(guān)注點(diǎn)放在這段代碼上:
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
//5---放入到recvqueue隊(duì)列中锚国,待處理
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
//--對(duì)方也是Looking,判斷那方的Epoch和zxid大玄坦,大的成為新leader候選
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
//--當(dāng)前server不是looking血筑,但是對(duì)方是looking,就發(fā)給自己認(rèn)為是leader的消息過去
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
}
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
//等會(huì)發(fā)給 發(fā)送消息隊(duì)列的(queueSendMap) 消息隊(duì)列
sendqueue.offer(notmsg);
}
}
選舉邏輯
首先有這樣一個(gè)選舉類:
public class Vote {
//
final private int version;
//被選舉leader的服務(wù)器ID
final private long id;
//被選舉leader的事務(wù)ID
final private long zxid;
//邏輯時(shí)鐘煎楣,判斷多個(gè)選票是否處于同一個(gè)選舉周期豺总,
final private long electionEpoch;
//被推舉leader的選舉輪次
final private long peerEpoch;
//狀態(tài)
final private ServerState state;
}
核心算法:FastLeaderElection#lookForLeader(可以啟動(dòng)只讀模式和阻塞模式)
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
//--儲(chǔ)存收到的Notication
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//5--更新選舉周期
logicalclock.incrementAndGet();
//5--把自己作為leader作為投票發(fā)給其它,這個(gè)時(shí)候并未發(fā)送出去,下面才是廣播發(fā)送
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
//--放到sendqueue里面择懂,等待發(fā)送給其他人
//--初始化投票
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//5--recvqueue數(shù)據(jù)來自Messenger喻喳,也可能來自后面候選人失敗了再放進(jìn)去的消息
//5--notTimeout超時(shí)時(shí)間
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
//5--檢查網(wǎng)絡(luò)發(fā)送隊(duì)列queueSendMap是否為空,再次發(fā)送
if(manager.haveDelivered()){
sendNotifications();
} else {
//5--重連
//5--queueSendMap的key是每臺(tái)機(jī)器的sid
manager.connectAll();
}
/*
* Exponential backoff
*/
//5--如果超時(shí)沒有獲取到選票vote則采用退避算法困曙,下次使用更長(zhǎng)的超時(shí)時(shí)間
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
} //5--?????
else if (validVoter(n.sid) && validVoter(n.leader)) {
//5--這里去看對(duì)面是什么狀態(tài)
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
//5--選取electionEpoch較大的--選舉輪次
//如果electionEpoch相等則取zxid較大的
//如果zxid相等則取myid較大的
// If notification > current, replace and send messages out
//5--對(duì)方投票周期大于自己IDE
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
//5--投票集合清空
recvset.clear();
//5--比較myid表伦,zxid,electionEpoch
//5--1:electionEpoch大慷丽,2:zxid大蹦哼,3:myid大(leader編號(hào))
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//5--更新大的一方的myid,zxid要糊,electionEpoch
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//5--告訴其他人
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {//5--忽略對(duì)方投票
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {//5--周期相同翔怎,跟第一個(gè)條件一樣的比較
updateProposal(n.leader, n.zxid, n.peerEpoch);
//5--告訴其他人
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//5--判斷當(dāng)前候選人proposedLeader,proposedZxid杨耙,proposedEpoch在選票中是否占了大多數(shù)?飘痛?珊膜?不清楚怎么判斷的--
//--QuorumHierarchical.containsQuorum()或者QuorumMaj.containsQuorum()
//5--嘗試通過現(xiàn)在已經(jīng)收到的信息,判斷是否已經(jīng)足夠確認(rèn)最終的leader了宣脉,通過方法termPredicate() 车柠,判斷標(biāo)準(zhǔn)很簡(jiǎn)單:是否已經(jīng)有超過半數(shù)的機(jī)
// 器所推舉的leader為當(dāng)前自己所推舉的leader.如果是,保險(xiǎn)起見,最多再等待finalizeWait(默認(rèn)200ms)的時(shí)間進(jìn)行最后的確認(rèn)竹祷,如果發(fā)現(xiàn)有
// 了更新的leader信息谈跛,則把這個(gè)Notification重新放回recvqueue,顯然,選舉將繼續(xù)進(jìn)行塑陵。否則感憾,選舉結(jié)束,根據(jù)選舉的leader是否是自己令花,設(shè)
// 置自己的狀態(tài)為L(zhǎng)EADING或者OBSERVING或者FOLLOWING阻桅。
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
//--看是否已選定的候選人被修改
//--注意這里有個(gè)finalizeWait延時(shí)獲取
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
//--如果被修改,再次放到recvqueue再次循環(huán)
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//5--隊(duì)列中所有的投票都已處理完兼都,則選舉出Leader嫂沉,并判斷是否屬于自己
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//5--一個(gè)時(shí)鐘周期
if(n.electionEpoch == logicalclock.get()){
//5--存到recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//5--比較對(duì)方和自己選舉的的leader,占據(jù)大多數(shù)
if(termPredicate(recvset, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*/
//--不在一個(gè)時(shí)鐘扮碧,說明自己掛了又起起來了趟章,把被人的投票放到outofelection,
//--對(duì)方的投票在outofelection占據(jù)大多數(shù)并且承認(rèn)自己愿意做leader
outofelection.put(n.sid, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
//--占據(jù)大多數(shù)
if (termPredicate(outofelection, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
//--承認(rèn)自己是leader
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
lookForLeader的邏輯便是:
1:更新選舉周期慎王,把自己作為leader作為投票發(fā)給其它蚓土。
2:進(jìn)入本輪投票循環(huán),直到不是looking狀態(tài)柬祠。
在接收到投票后判斷對(duì)方狀態(tài):
- LOOKING:比較兩者的投票信息北戏,比較的順序是Epoch、zxid漫蛔、Id嗜愈,優(yōu)先選投票輪次高的,投票輪次相同選Zxid高的莽龟,Zxid相同選id高的蠕嫁,并且確定結(jié)果后還要告訴其他人自己的選舉結(jié)果,同時(shí)判斷判斷當(dāng)前候選人proposedLeader毯盈,proposedZxid剃毒,proposedEpoch在選票中是否占了大多數(shù),這個(gè)是在FastLeaderElection#termPredicate實(shí)現(xiàn)搂赋,具體實(shí)現(xiàn)類有兩個(gè)赘阀, 分別是QuorumMaj.containsQuorum()或者QuorumHierarchical.containsQuorum(),做完這一步后脑奠,還要判斷一下是否有人修改過leader基公,如果被修改,再次放到recvqueue再次循環(huán)
- OBSERVING:對(duì)方是一個(gè)OBSERVING狀態(tài)宋欺,直接無視它轰豆。
- FOLLOWING和LEADING:如果對(duì)方和自己是在一個(gè)時(shí)鐘內(nèi)胰伍,就說明對(duì)方已經(jīng)完成了選舉,如果對(duì)方說它就是leader酸休,我們承認(rèn)就好骂租,否則做一個(gè)大多數(shù)判斷。都通過了的話就把該leader作為自己的leader斑司。如果對(duì)方和自己不再一個(gè)時(shí)鐘渗饮,說明自己掛了又起起來了,把被人的投票放到outofelection陡厘,如果對(duì)方的投票在outofelection占據(jù)大多數(shù)并且承認(rèn)自己愿意做leader抽米,這時(shí)候更新選舉周期,修改自己的狀態(tài)為FOLLOWING或者LEADING
以上就是本文全部?jī)?nèi)容糙置,前面部分一定要跟著兩張圖來看云茸,不然很容易暈,后面的選舉邏輯看文字解析還算比較容易懂谤饭。